Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: add get_available_parallelism function #13595

Merged
merged 2 commits into from
Nov 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 4 additions & 7 deletions benchmarks/src/bin/external_aggr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,9 @@
//! external_aggr binary entrypoint

use std::collections::HashMap;
use std::num::NonZero;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::OnceLock;
use std::thread::available_parallelism;
use structopt::StructOpt;

use arrow::record_batch::RecordBatch;
Expand All @@ -41,6 +39,7 @@ use datafusion::physical_plan::{collect, displayable};
use datafusion::prelude::*;
use datafusion_benchmarks::util::{BenchmarkRun, CommonOpt};
use datafusion_common::instant::Instant;
use datafusion_common::utils::get_available_parallelism;
use datafusion_common::{exec_datafusion_err, exec_err, DEFAULT_PARQUET_EXTENSION};

#[derive(Debug, StructOpt)]
Expand Down Expand Up @@ -327,11 +326,9 @@ impl ExternalAggrConfig {
}

fn partitions(&self) -> usize {
self.common.partitions.unwrap_or(
available_parallelism()
.unwrap_or(NonZero::new(1).unwrap())
.get(),
)
self.common
.partitions
.unwrap_or(get_available_parallelism())
}

/// Parse memory limit from string to number of bytes
Expand Down
7 changes: 2 additions & 5 deletions benchmarks/src/bin/h2o.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,9 @@ use datafusion::datasource::MemTable;
use datafusion::prelude::CsvReadOptions;
use datafusion::{arrow::util::pretty, error::Result, prelude::SessionContext};
use datafusion_benchmarks::util::BenchmarkRun;
use std::num::NonZero;
use datafusion_common::utils::get_available_parallelism;
use std::path::PathBuf;
use std::sync::Arc;
use std::thread::available_parallelism;
use structopt::StructOpt;
use tokio::time::Instant;

Expand Down Expand Up @@ -93,9 +92,7 @@ async fn group_by(opt: &GroupBy) -> Result<()> {
.with_listing_options(ListingOptions::new(Arc::new(CsvFormat::default())))
.with_schema(Arc::new(schema));
let csv = ListingTable::try_new(listing_config)?;
let partition_size = available_parallelism()
.unwrap_or(NonZero::new(1).unwrap())
.get();
let partition_size = get_available_parallelism();
let memtable =
MemTable::load(Arc::new(csv), Some(partition_size), &ctx.state()).await?;
ctx.register_table("x", Arc::new(memtable))?;
Expand Down
11 changes: 4 additions & 7 deletions benchmarks/src/imdb/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,8 @@
// specific language governing permissions and limitations
// under the License.

use std::num::NonZero;
use std::path::PathBuf;
use std::sync::Arc;
use std::thread::available_parallelism;

use super::{get_imdb_table_schema, get_query_sql, IMDB_TABLES};
use crate::util::{BenchmarkRun, CommonOpt};
Expand All @@ -37,6 +35,7 @@ use datafusion::physical_plan::display::DisplayableExecutionPlan;
use datafusion::physical_plan::{collect, displayable};
use datafusion::prelude::*;
use datafusion_common::instant::Instant;
use datafusion_common::utils::get_available_parallelism;
use datafusion_common::{DEFAULT_CSV_EXTENSION, DEFAULT_PARQUET_EXTENSION};

use log::info;
Expand Down Expand Up @@ -470,11 +469,9 @@ impl RunOpt {
}

fn partitions(&self) -> usize {
self.common.partitions.unwrap_or(
available_parallelism()
.unwrap_or(NonZero::new(1).unwrap())
.get(),
)
self.common
.partitions
.unwrap_or(get_available_parallelism())
}
}

Expand Down
12 changes: 4 additions & 8 deletions benchmarks/src/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,8 @@
// specific language governing permissions and limitations
// under the License.

use std::num::NonZero;
use std::path::PathBuf;
use std::sync::Arc;
use std::thread::available_parallelism;

use crate::util::{AccessLogOpt, BenchmarkRun, CommonOpt};

Expand All @@ -30,7 +28,7 @@ use datafusion::physical_plan::sorts::sort::SortExec;
use datafusion::prelude::{SessionConfig, SessionContext};
use datafusion::test_util::parquet::TestParquetFile;
use datafusion_common::instant::Instant;

use datafusion_common::utils::get_available_parallelism;
use structopt::StructOpt;

/// Test performance of sorting large datasets
Expand Down Expand Up @@ -149,11 +147,9 @@ impl RunOpt {
rundata.start_new_case(title);
for i in 0..self.common.iterations {
let config = SessionConfig::new().with_target_partitions(
self.common.partitions.unwrap_or(
available_parallelism()
.unwrap_or(NonZero::new(1).unwrap())
.get(),
),
self.common
.partitions
.unwrap_or(get_available_parallelism()),
);
let ctx = SessionContext::new_with_config(config);
let (rows, elapsed) =
Expand Down
11 changes: 4 additions & 7 deletions benchmarks/src/sort_tpch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,8 @@
//! runs end-to-end sort queries and test the performance on multiple CPU cores.

use futures::StreamExt;
use std::num::NonZero;
use std::path::PathBuf;
use std::sync::Arc;
use std::thread::available_parallelism;
use structopt::StructOpt;

use datafusion::datasource::file_format::parquet::ParquetFormat;
Expand All @@ -39,6 +37,7 @@ use datafusion::physical_plan::display::DisplayableExecutionPlan;
use datafusion::physical_plan::{displayable, execute_stream};
use datafusion::prelude::*;
use datafusion_common::instant::Instant;
use datafusion_common::utils::get_available_parallelism;
use datafusion_common::DEFAULT_PARQUET_EXTENSION;

use crate::util::{BenchmarkRun, CommonOpt};
Expand Down Expand Up @@ -317,10 +316,8 @@ impl RunOpt {
}

fn partitions(&self) -> usize {
self.common.partitions.unwrap_or(
available_parallelism()
.unwrap_or(NonZero::new(1).unwrap())
.get(),
)
self.common
.partitions
.unwrap_or(get_available_parallelism())
}
}
11 changes: 4 additions & 7 deletions benchmarks/src/tpch/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,8 @@
// specific language governing permissions and limitations
// under the License.

use std::num::NonZero;
use std::path::PathBuf;
use std::sync::Arc;
use std::thread::available_parallelism;

use super::{
get_query_sql, get_tbl_tpch_table_schema, get_tpch_table_schema, TPCH_TABLES,
Expand All @@ -39,6 +37,7 @@ use datafusion::physical_plan::display::DisplayableExecutionPlan;
use datafusion::physical_plan::{collect, displayable};
use datafusion::prelude::*;
use datafusion_common::instant::Instant;
use datafusion_common::utils::get_available_parallelism;
use datafusion_common::{DEFAULT_CSV_EXTENSION, DEFAULT_PARQUET_EXTENSION};

use log::info;
Expand Down Expand Up @@ -298,11 +297,9 @@ impl RunOpt {
}

fn partitions(&self) -> usize {
self.common.partitions.unwrap_or(
available_parallelism()
.unwrap_or(NonZero::new(1).unwrap())
.get(),
)
self.common
.partitions
.unwrap_or(get_available_parallelism())
}
}

Expand Down
9 changes: 2 additions & 7 deletions benchmarks/src/util/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,8 @@
// specific language governing permissions and limitations
// under the License.

use std::{num::NonZero, thread::available_parallelism};

use datafusion::prelude::SessionConfig;
use datafusion_common::utils::get_available_parallelism;
use structopt::StructOpt;

// Common benchmark options (don't use doc comments otherwise this doc
Expand Down Expand Up @@ -51,11 +50,7 @@ impl CommonOpt {
pub fn update_config(&self, config: SessionConfig) -> SessionConfig {
config
.with_target_partitions(
self.partitions.unwrap_or(
available_parallelism()
.unwrap_or(NonZero::new(1).unwrap())
.get(),
),
self.partitions.unwrap_or(get_available_parallelism()),
)
.with_batch_size(self.batch_size)
}
Expand Down
7 changes: 2 additions & 5 deletions benchmarks/src/util/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,12 @@
// under the License.

use datafusion::{error::Result, DATAFUSION_VERSION};
use datafusion_common::utils::get_available_parallelism;
use serde::{Serialize, Serializer};
use serde_json::Value;
use std::{
collections::HashMap,
num::NonZero,
path::Path,
thread::available_parallelism,
time::{Duration, SystemTime},
};

Expand Down Expand Up @@ -70,9 +69,7 @@ impl RunContext {
Self {
benchmark_version: env!("CARGO_PKG_VERSION").to_owned(),
datafusion_version: DATAFUSION_VERSION.to_owned(),
num_cpus: available_parallelism()
.unwrap_or(NonZero::new(1).unwrap())
.get(),
num_cpus: get_available_parallelism(),
start_time: SystemTime::now(),
arguments: std::env::args().skip(1).collect::<Vec<String>>(),
}
Expand Down
7 changes: 3 additions & 4 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,11 @@
use std::any::Any;
use std::collections::{BTreeMap, HashMap};
use std::fmt::{self, Display};
use std::num::NonZero;
use std::str::FromStr;
use std::thread::available_parallelism;

use crate::error::_config_err;
use crate::parsers::CompressionTypeVariant;
use crate::utils::get_available_parallelism;
use crate::{DataFusionError, Result};

/// A macro that wraps a configuration struct and automatically derives
Expand Down Expand Up @@ -252,7 +251,7 @@ config_namespace! {
/// concurrency.
///
/// Defaults to the number of CPU cores on the system
pub target_partitions: usize, default = available_parallelism().unwrap_or(NonZero::new(1).unwrap()).get()
pub target_partitions: usize, default = get_available_parallelism()

/// The default time zone
///
Expand All @@ -268,7 +267,7 @@ config_namespace! {
/// This is mostly use to plan `UNION` children in parallel.
///
/// Defaults to the number of CPU cores on the system
pub planning_concurrency: usize, default = available_parallelism().unwrap_or(NonZero::new(1).unwrap()).get()
pub planning_concurrency: usize, default = get_available_parallelism()

/// When set to true, skips verifying that the schema produced by
/// planning the input of `LogicalPlan::Aggregate` exactly matches the
Expand Down
12 changes: 12 additions & 0 deletions datafusion/common/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,10 @@ use sqlparser::parser::Parser;
use std::borrow::{Borrow, Cow};
use std::cmp::{min, Ordering};
use std::collections::HashSet;
use std::num::NonZero;
use std::ops::Range;
use std::sync::Arc;
use std::thread::available_parallelism;

/// Applies an optional projection to a [`SchemaRef`], returning the
/// projected schema
Expand Down Expand Up @@ -761,6 +763,16 @@ pub fn combine_limit(
(combined_skip, combined_fetch)
}

/// Returns the estimated number of threads available for parallel execution.
///
/// This is a wrapper around `std::thread::available_parallelism`, providing a default value
/// of `1` if the system's parallelism cannot be determined.
pub fn get_available_parallelism() -> usize {
available_parallelism()
.unwrap_or(NonZero::new(1).expect("literal value `1` shouldn't be zero"))
.get()
}

#[cfg(test)]
mod tests {
use crate::ScalarValue::Null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@
// specific language governing permissions and limitations
// under the License.

use std::{cmp, num::NonZero, sync::Arc, thread::available_parallelism};
use std::{cmp, sync::Arc};

use datafusion::{
datasource::MemTable,
prelude::{SessionConfig, SessionContext},
};
use datafusion_catalog::TableProvider;
use datafusion_common::error::Result;
use datafusion_common::ScalarValue;
use datafusion_common::{error::Result, utils::get_available_parallelism};
use datafusion_expr::col;
use rand::{thread_rng, Rng};

Expand Down Expand Up @@ -73,9 +73,7 @@ impl SessionContextGenerator {
];

let max_batch_size = cmp::max(1, dataset_ref.total_rows_num);
let max_target_partitions = available_parallelism()
.unwrap_or(NonZero::new(1).unwrap())
.get();
let max_target_partitions = get_available_parallelism();

Self {
dataset: dataset_ref,
Expand Down
10 changes: 2 additions & 8 deletions datafusion/core/tests/sql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use std::num::NonZero;
use std::sync::Arc;
use std::thread::available_parallelism;

use arrow::{
array::*, datatypes::*, record_batch::RecordBatch,
Expand All @@ -34,6 +32,7 @@ use datafusion::prelude::*;
use datafusion::test_util;
use datafusion::{assert_batches_eq, assert_batches_sorted_eq};
use datafusion::{execution::context::SessionContext, physical_plan::displayable};
use datafusion_common::utils::get_available_parallelism;
use datafusion_common::{assert_contains, assert_not_contains};
use object_store::path::Path;
use std::fs::File;
Expand Down Expand Up @@ -261,12 +260,7 @@ impl ExplainNormalizer {

// convert things like partitioning=RoundRobinBatch(16)
// to partitioning=RoundRobinBatch(NUM_CORES)
let needle = format!(
"RoundRobinBatch({})",
available_parallelism()
.unwrap_or(NonZero::new(1).unwrap())
.get()
);
let needle = format!("RoundRobinBatch({})", get_available_parallelism());
replacements.push((needle, "RoundRobinBatch(NUM_CORES)".to_string()));

Self { replacements }
Expand Down
9 changes: 2 additions & 7 deletions datafusion/sqllogictest/bin/sqllogictests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,10 @@

use std::ffi::OsStr;
use std::fs;
use std::num::NonZero;
use std::path::{Path, PathBuf};
use std::thread::available_parallelism;

use clap::Parser;
use datafusion_common::utils::get_available_parallelism;
use datafusion_sqllogictest::{DataFusion, TestContext};
use futures::stream::StreamExt;
use itertools::Itertools;
Expand Down Expand Up @@ -114,11 +113,7 @@ async fn run_tests() -> Result<()> {
.join()
})
// run up to num_cpus streams in parallel
.buffer_unordered(
available_parallelism()
.unwrap_or(NonZero::new(1).unwrap())
.get(),
)
.buffer_unordered(get_available_parallelism())
.flat_map(|result| {
// Filter out any Ok() leaving only the DataFusionErrors
futures::stream::iter(match result {
Expand Down