Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache/main' into feature/9421
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Mar 9, 2024
2 parents c0ce362 + afd0f90 commit bb29203
Show file tree
Hide file tree
Showing 68 changed files with 1,642 additions and 963 deletions.
13 changes: 8 additions & 5 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -79,17 +79,20 @@ jobs:

# Ensure that the datafusion crate can be built with only a subset of the function
# packages enabled.
- name: Check function packages (array_expressions)
run: cargo check --no-default-features --features=array_expressions -p datafusion

- name: Check function packages (datetime_expressions)
run: cargo check --no-default-features --features=datetime_expressions -p datafusion

- name: Check function packages (encoding_expressions)
run: cargo check --no-default-features --features=encoding_expressions -p datafusion

- name: Check function packages (math_expressions)
run: cargo check --no-default-features --features=math_expressions -p datafusion

- name: Check function packages (array_expressions)
run: cargo check --no-default-features --features=array_expressions -p datafusion

- name: Check function packages (datetime_expressions)
run: cargo check --no-default-features --features=datetime_expressions -p datafusion
- name: Check function packages (regex_expressions)
run: cargo check --no-default-features --features=regex_expressions -p datafusion

- name: Check Cargo.lock for datafusion-cli
run: |
Expand Down
40 changes: 21 additions & 19 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion datafusion-examples/examples/to_timestamp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ async fn main() -> Result<()> {
.collect()
.await;

let expected = "Error parsing timestamp from '01-14-2023 01/01/30' using format '%d-%m-%Y %H:%M:%S': input contains invalid characters";
let expected = "Execution error: Error parsing timestamp from '01-14-2023 01/01/30' using format '%d-%m-%Y %H:%M:%S': input is out of range";
assert_contains!(result.unwrap_err().to_string(), expected);

// note that using arrays for the chrono formats is not supported
Expand Down
3 changes: 3 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,9 @@ config_namespace! {
/// When set to true, the explain statement will print operator statistics
/// for physical plans
pub show_statistics: bool, default = false

/// When set to true, the explain statement will print the partition sizes
pub show_sizes: bool, default = true
}
}

Expand Down
4 changes: 4 additions & 0 deletions datafusion/common/src/scalar/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5826,6 +5826,7 @@ mod tests {
.unwrap()
.and_hms_opt(hour, minute, second)
.unwrap()
.and_utc()
.timestamp(),
),
None,
Expand All @@ -5838,6 +5839,7 @@ mod tests {
.unwrap()
.and_hms_milli_opt(hour, minute, second, millisec)
.unwrap()
.and_utc()
.timestamp_millis(),
),
None,
Expand All @@ -5850,6 +5852,7 @@ mod tests {
.unwrap()
.and_hms_micro_opt(hour, minute, second, microsec)
.unwrap()
.and_utc()
.timestamp_micros(),
),
None,
Expand All @@ -5862,6 +5865,7 @@ mod tests {
.unwrap()
.and_hms_nano_opt(hour, minute, second, nanosec)
.unwrap()
.and_utc()
.timestamp_nanos_opt()
.unwrap(),
),
Expand Down
6 changes: 5 additions & 1 deletion datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,11 @@ force_hash_collisions = []
math_expressions = ["datafusion-functions/math_expressions"]
parquet = ["datafusion-common/parquet", "dep:parquet"]
pyarrow = ["datafusion-common/pyarrow", "parquet"]
regex_expressions = ["datafusion-physical-expr/regex_expressions", "datafusion-optimizer/regex_expressions"]
regex_expressions = [
"datafusion-physical-expr/regex_expressions",
"datafusion-optimizer/regex_expressions",
"datafusion-functions/regex_expressions",
]
serde = ["arrow-schema/serde"]
unicode_expressions = [
"datafusion-physical-expr/unicode_expressions",
Expand Down
76 changes: 45 additions & 31 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ use datafusion_physical_expr::{

use async_trait::async_trait;
use futures::{future, stream, StreamExt, TryStreamExt};
use object_store::ObjectStore;

/// Configuration for creating a [`ListingTable`]
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -450,7 +451,7 @@ impl ListingOptions {
}

/// Reads data from one or more files via an
/// [`ObjectStore`](object_store::ObjectStore). For example, from
/// [`ObjectStore`]. For example, from
/// local files or objects from AWS S3. Implements [`TableProvider`],
/// a DataFusion data source.
///
Expand Down Expand Up @@ -844,38 +845,14 @@ impl ListingTable {
let files = file_list
.map(|part_file| async {
let part_file = part_file?;
let mut statistics_result = Statistics::new_unknown(&self.file_schema);
if self.options.collect_stat {
let statistics_cache = self.collected_statistics.clone();
match statistics_cache.get_with_extra(
&part_file.object_meta.location,
&part_file.object_meta,
) {
Some(statistics) => {
statistics_result = statistics.as_ref().clone()
}
None => {
let statistics = self
.options
.format
.infer_stats(
ctx,
&store,
self.file_schema.clone(),
&part_file.object_meta,
)
.await?;
statistics_cache.put_with_extra(
&part_file.object_meta.location,
statistics.clone().into(),
&part_file.object_meta,
);
statistics_result = statistics;
}
}
let statistics =
self.do_collect_statistics(ctx, &store, &part_file).await?;
Ok((part_file, statistics)) as Result<(PartitionedFile, Statistics)>
} else {
Ok((part_file, Statistics::new_unknown(&self.file_schema)))
as Result<(PartitionedFile, Statistics)>
}
Ok((part_file, statistics_result))
as Result<(PartitionedFile, Statistics)>
})
.boxed()
.buffered(ctx.config_options().execution.meta_fetch_concurrency);
Expand All @@ -893,6 +870,43 @@ impl ListingTable {
statistics,
))
}

/// Collects statistics for a given partitioned file.
///
/// This method first checks if the statistics for the given file are already cached.
/// If they are, it returns the cached statistics.
/// If they are not, it infers the statistics from the file and stores them in the cache.
async fn do_collect_statistics<'a>(
&'a self,
ctx: &SessionState,
store: &Arc<dyn ObjectStore>,
part_file: &PartitionedFile,
) -> Result<Statistics> {
let statistics_cache = self.collected_statistics.clone();
return match statistics_cache
.get_with_extra(&part_file.object_meta.location, &part_file.object_meta)
{
Some(statistics) => Ok(statistics.as_ref().clone()),
None => {
let statistics = self
.options
.format
.infer_stats(
ctx,
store,
self.file_schema.clone(),
&part_file.object_meta,
)
.await?;
statistics_cache.put_with_extra(
&part_file.object_meta.location,
statistics.clone().into(),
&part_file.object_meta,
);
Ok(statistics)
}
};
}
}

#[cfg(test)]
Expand Down
4 changes: 4 additions & 0 deletions datafusion/core/src/datasource/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,9 +216,13 @@ impl TableProvider for MemTable {
let inner_vec = arc_inner_vec.read().await;
partitions.push(inner_vec.clone())
}

let mut exec =
MemoryExec::try_new(&partitions, self.schema(), projection.cloned())?;

let show_sizes = state.config_options().explain.show_sizes;
exec = exec.with_show_sizes(show_sizes);

// add sort information if present
let sort_order = self.sort_order.lock();
if !sort_order.is_empty() {
Expand Down
21 changes: 11 additions & 10 deletions datafusion/core/tests/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use arrow::{
record_batch::RecordBatch,
util::pretty::pretty_format_batches,
};
use chrono::{Datelike, Duration};
use chrono::{Datelike, Duration, TimeDelta};
use datafusion::{
datasource::{physical_plan::ParquetExec, provider_as_source, TableProvider},
physical_plan::{accept, metrics::MetricsSet, ExecutionPlan, ExecutionPlanVisitor},
Expand Down Expand Up @@ -310,6 +310,7 @@ fn make_timestamp_batch(offset: Duration) -> RecordBatch {
offset_nanos
+ t.parse::<chrono::NaiveDateTime>()
.unwrap()
.and_utc()
.timestamp_nanos_opt()
.unwrap()
})
Expand Down Expand Up @@ -459,7 +460,7 @@ fn make_date_batch(offset: Duration) -> RecordBatch {
.unwrap()
.and_time(chrono::NaiveTime::from_hms_opt(0, 0, 0).unwrap());
let t = t + offset;
t.timestamp_millis()
t.and_utc().timestamp_millis()
})
})
.collect::<Vec<_>>();
Expand Down Expand Up @@ -511,18 +512,18 @@ fn create_data_batch(scenario: Scenario) -> Vec<RecordBatch> {
match scenario {
Scenario::Timestamps => {
vec![
make_timestamp_batch(Duration::seconds(0)),
make_timestamp_batch(Duration::seconds(10)),
make_timestamp_batch(Duration::minutes(10)),
make_timestamp_batch(Duration::days(10)),
make_timestamp_batch(TimeDelta::try_seconds(0).unwrap()),
make_timestamp_batch(TimeDelta::try_seconds(10).unwrap()),
make_timestamp_batch(TimeDelta::try_minutes(10).unwrap()),
make_timestamp_batch(TimeDelta::try_days(10).unwrap()),
]
}
Scenario::Dates => {
vec![
make_date_batch(Duration::days(0)),
make_date_batch(Duration::days(10)),
make_date_batch(Duration::days(300)),
make_date_batch(Duration::days(3600)),
make_date_batch(TimeDelta::try_days(0).unwrap()),
make_date_batch(TimeDelta::try_days(10).unwrap()),
make_date_batch(TimeDelta::try_days(300).unwrap()),
make_date_batch(TimeDelta::try_days(3600).unwrap()),
]
}
Scenario::Int32 => {
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/tests/parquet/page_pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ async fn prune_date64() {
.parse::<chrono::NaiveDate>()
.unwrap()
.and_time(chrono::NaiveTime::from_hms_opt(0, 0, 0).unwrap());
let date = ScalarValue::Date64(Some(date.timestamp_millis()));
let date = ScalarValue::Date64(Some(date.and_utc().timestamp_millis()));

let output = ContextWithParquet::new(Scenario::Dates, Page)
.await
Expand Down
Loading

0 comments on commit bb29203

Please sign in to comment.