Skip to content

Commit

Permalink
Merge remote-tracking branch 'public/main' into generate_series
Browse files Browse the repository at this point in the history
  • Loading branch information
2010YOUY01 committed Nov 30, 2024
2 parents e775df4 + 523a455 commit cd38a60
Show file tree
Hide file tree
Showing 456 changed files with 7,369 additions and 4,102 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ datafusion/sqllogictests/test_files/tpch/data/*
# Scratch temp dir for sqllogictests
datafusion/sqllogictest/test_files/scratch*

# temp file for core
datafusion/core/*.parquet

# rat
filtered_rat.txt
rat.txt
10 changes: 5 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ members = [
"datafusion-examples/examples/ffi/ffi_module_loader",
"test-utils",
"benchmarks",
"datafusion/macros",
"datafusion/doc",
]
resolver = "2"

Expand Down Expand Up @@ -90,9 +92,8 @@ arrow-ipc = { version = "53.3.0", default-features = false, features = [
] }
arrow-ord = { version = "53.3.0", default-features = false }
arrow-schema = { version = "53.3.0", default-features = false }
arrow-string = { version = "53.3.0", default-features = false }
async-trait = "0.1.73"
bigdecimal = "=0.4.1"
bigdecimal = "0.4.6"
bytes = "1.4"
chrono = { version = "0.4.38", default-features = false }
ctor = "0.2.0"
Expand All @@ -101,6 +102,7 @@ datafusion = { path = "datafusion/core", version = "43.0.0", default-features =
datafusion-catalog = { path = "datafusion/catalog", version = "43.0.0" }
datafusion-common = { path = "datafusion/common", version = "43.0.0", default-features = false }
datafusion-common-runtime = { path = "datafusion/common-runtime", version = "43.0.0" }
datafusion-doc = { path = "datafusion/doc", version = "43.0.0" }
datafusion-execution = { path = "datafusion/execution", version = "43.0.0" }
datafusion-expr = { path = "datafusion/expr", version = "43.0.0" }
datafusion-expr-common = { path = "datafusion/expr-common", version = "43.0.0" }
Expand All @@ -112,6 +114,7 @@ datafusion-functions-nested = { path = "datafusion/functions-nested", version =
datafusion-functions-table = { path = "datafusion/functions-table", version = "43.0.0" }
datafusion-functions-window = { path = "datafusion/functions-window", version = "43.0.0" }
datafusion-functions-window-common = { path = "datafusion/functions-window-common", version = "43.0.0" }
datafusion-macros = { path = "datafusion/macros", version = "43.0.0" }
datafusion-optimizer = { path = "datafusion/optimizer", version = "43.0.0", default-features = false }
datafusion-physical-expr = { path = "datafusion/physical-expr", version = "43.0.0", default-features = false }
datafusion-physical-expr-common = { path = "datafusion/physical-expr-common", version = "43.0.0", default-features = false }
Expand All @@ -120,8 +123,6 @@ datafusion-physical-plan = { path = "datafusion/physical-plan", version = "43.0.
datafusion-proto = { path = "datafusion/proto", version = "43.0.0" }
datafusion-proto-common = { path = "datafusion/proto-common", version = "43.0.0" }
datafusion-sql = { path = "datafusion/sql", version = "43.0.0" }
datafusion-sqllogictest = { path = "datafusion/sqllogictest", version = "43.0.0" }
datafusion-substrait = { path = "datafusion/substrait", version = "43.0.0" }
doc-comment = "0.3"
env_logger = "0.11"
futures = "0.3"
Expand All @@ -130,7 +131,6 @@ hashbrown = { version = "0.14.5", features = ["raw"] }
indexmap = "2.0.0"
itertools = "0.13"
log = "^0.4"
num_cpus = "1.13.0"
object_store = { version = "0.11.0", default-features = false }
parking_lot = "0.12"
parquet = { version = "53.3.0", default-features = false, features = [
Expand Down
1 change: 0 additions & 1 deletion benchmarks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ env_logger = { workspace = true }
futures = { workspace = true }
log = { workspace = true }
mimalloc = { version = "0.1", optional = true, default-features = false }
num_cpus = { workspace = true }
parquet = { workspace = true, default-features = true }
serde = { version = "1.0.136", features = ["derive"] }
serde_json = { workspace = true }
Expand Down
5 changes: 4 additions & 1 deletion benchmarks/src/bin/external_aggr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use datafusion::physical_plan::{collect, displayable};
use datafusion::prelude::*;
use datafusion_benchmarks::util::{BenchmarkRun, CommonOpt};
use datafusion_common::instant::Instant;
use datafusion_common::utils::get_available_parallelism;
use datafusion_common::{exec_datafusion_err, exec_err, DEFAULT_PARQUET_EXTENSION};

#[derive(Debug, StructOpt)]
Expand Down Expand Up @@ -325,7 +326,9 @@ impl ExternalAggrConfig {
}

fn partitions(&self) -> usize {
self.common.partitions.unwrap_or(num_cpus::get())
self.common
.partitions
.unwrap_or(get_available_parallelism())
}

/// Parse memory limit from string to number of bytes
Expand Down
3 changes: 2 additions & 1 deletion benchmarks/src/bin/h2o.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use datafusion::datasource::MemTable;
use datafusion::prelude::CsvReadOptions;
use datafusion::{arrow::util::pretty, error::Result, prelude::SessionContext};
use datafusion_benchmarks::util::BenchmarkRun;
use datafusion_common::utils::get_available_parallelism;
use std::path::PathBuf;
use std::sync::Arc;
use structopt::StructOpt;
Expand Down Expand Up @@ -91,7 +92,7 @@ async fn group_by(opt: &GroupBy) -> Result<()> {
.with_listing_options(ListingOptions::new(Arc::new(CsvFormat::default())))
.with_schema(Arc::new(schema));
let csv = ListingTable::try_new(listing_config)?;
let partition_size = num_cpus::get();
let partition_size = get_available_parallelism();
let memtable =
MemTable::load(Arc::new(csv), Some(partition_size), &ctx.state()).await?;
ctx.register_table("x", Arc::new(memtable))?;
Expand Down
5 changes: 4 additions & 1 deletion benchmarks/src/imdb/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use datafusion::physical_plan::display::DisplayableExecutionPlan;
use datafusion::physical_plan::{collect, displayable};
use datafusion::prelude::*;
use datafusion_common::instant::Instant;
use datafusion_common::utils::get_available_parallelism;
use datafusion_common::{DEFAULT_CSV_EXTENSION, DEFAULT_PARQUET_EXTENSION};

use log::info;
Expand Down Expand Up @@ -468,7 +469,9 @@ impl RunOpt {
}

fn partitions(&self) -> usize {
self.common.partitions.unwrap_or(num_cpus::get())
self.common
.partitions
.unwrap_or(get_available_parallelism())
}
}

Expand Down
6 changes: 4 additions & 2 deletions benchmarks/src/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use datafusion::physical_plan::sorts::sort::SortExec;
use datafusion::prelude::{SessionConfig, SessionContext};
use datafusion::test_util::parquet::TestParquetFile;
use datafusion_common::instant::Instant;

use datafusion_common::utils::get_available_parallelism;
use structopt::StructOpt;

/// Test performance of sorting large datasets
Expand Down Expand Up @@ -147,7 +147,9 @@ impl RunOpt {
rundata.start_new_case(title);
for i in 0..self.common.iterations {
let config = SessionConfig::new().with_target_partitions(
self.common.partitions.unwrap_or(num_cpus::get()),
self.common
.partitions
.unwrap_or(get_available_parallelism()),
);
let ctx = SessionContext::new_with_config(config);
let (rows, elapsed) =
Expand Down
5 changes: 4 additions & 1 deletion benchmarks/src/sort_tpch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use datafusion::physical_plan::display::DisplayableExecutionPlan;
use datafusion::physical_plan::{displayable, execute_stream};
use datafusion::prelude::*;
use datafusion_common::instant::Instant;
use datafusion_common::utils::get_available_parallelism;
use datafusion_common::DEFAULT_PARQUET_EXTENSION;

use crate::util::{BenchmarkRun, CommonOpt};
Expand Down Expand Up @@ -315,6 +316,8 @@ impl RunOpt {
}

fn partitions(&self) -> usize {
self.common.partitions.unwrap_or(num_cpus::get())
self.common
.partitions
.unwrap_or(get_available_parallelism())
}
}
5 changes: 4 additions & 1 deletion benchmarks/src/tpch/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use datafusion::physical_plan::display::DisplayableExecutionPlan;
use datafusion::physical_plan::{collect, displayable};
use datafusion::prelude::*;
use datafusion_common::instant::Instant;
use datafusion_common::utils::get_available_parallelism;
use datafusion_common::{DEFAULT_CSV_EXTENSION, DEFAULT_PARQUET_EXTENSION};

use log::info;
Expand Down Expand Up @@ -296,7 +297,9 @@ impl RunOpt {
}

fn partitions(&self) -> usize {
self.common.partitions.unwrap_or(num_cpus::get())
self.common
.partitions
.unwrap_or(get_available_parallelism())
}
}

Expand Down
5 changes: 4 additions & 1 deletion benchmarks/src/util/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

use datafusion::prelude::SessionConfig;
use datafusion_common::utils::get_available_parallelism;
use structopt::StructOpt;

// Common benchmark options (don't use doc comments otherwise this doc
Expand Down Expand Up @@ -48,7 +49,9 @@ impl CommonOpt {
/// Modify the existing config appropriately
pub fn update_config(&self, config: SessionConfig) -> SessionConfig {
config
.with_target_partitions(self.partitions.unwrap_or(num_cpus::get()))
.with_target_partitions(
self.partitions.unwrap_or(get_available_parallelism()),
)
.with_batch_size(self.batch_size)
}
}
3 changes: 2 additions & 1 deletion benchmarks/src/util/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

use datafusion::{error::Result, DATAFUSION_VERSION};
use datafusion_common::utils::get_available_parallelism;
use serde::{Serialize, Serializer};
use serde_json::Value;
use std::{
Expand Down Expand Up @@ -68,7 +69,7 @@ impl RunContext {
Self {
benchmark_version: env!("CARGO_PKG_VERSION").to_owned(),
datafusion_version: DATAFUSION_VERSION.to_owned(),
num_cpus: num_cpus::get(),
num_cpus: get_available_parallelism(),
start_time: SystemTime::now(),
arguments: std::env::args().skip(1).collect::<Vec<String>>(),
}
Expand Down
46 changes: 17 additions & 29 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit cd38a60

Please sign in to comment.