Skip to content

Commit

Permalink
Merge remote-tracking branch 'refs/remotes/upstream/main' into featur…
Browse files Browse the repository at this point in the history
…e/safe_timestamp_and_date_parsing
  • Loading branch information
Omega359 committed Jul 10, 2024
2 parents 0a5570c + d99002c commit 9d04c3a
Show file tree
Hide file tree
Showing 40 changed files with 3,597 additions and 330 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ bigdecimal = "=0.4.1"
bytes = "1.4"
chrono = { version = "0.4.34", default-features = false }
ctor = "0.2.0"
dashmap = "5.5.0"
dashmap = "6.0.1"
datafusion = { path = "datafusion/core", version = "40.0.0", default-features = false }
datafusion-common = { path = "datafusion/common", version = "40.0.0", default-features = false }
datafusion-common-runtime = { path = "datafusion/common-runtime", version = "40.0.0" }
Expand Down
43 changes: 25 additions & 18 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion datafusion-examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ mimalloc = { version = "0.1", default-features = false }
num_cpus = { workspace = true }
object_store = { workspace = true, features = ["aws", "http"] }
prost = { version = "0.12", default-features = false }
prost-derive = { version = "0.12", default-features = false }
prost-derive = { version = "0.13", default-features = false }
serde = { version = "1.0.136", features = ["derive"] }
serde_json = { workspace = true }
tempfile = { workspace = true }
Expand Down
1 change: 1 addition & 0 deletions datafusion-examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ cargo run --example dataframe
- [`composed_extension_codec`](examples/composed_extension_codec.rs): Example of using multiple extension codecs for serialization / deserialization
- [`csv_sql_streaming.rs`](examples/csv_sql_streaming.rs): Build and run a streaming query plan from a SQL statement against a local CSV file
- [`custom_datasource.rs`](examples/custom_datasource.rs): Run queries against a custom datasource (TableProvider)
- [`custom_file_format.rs`](examples/custom_file_format.rs): Write data to a custom file format
- [`dataframe-to-s3.rs`](examples/external_dependency/dataframe-to-s3.rs): Run a query using a DataFrame against a parquet file from s3 and writing back to s3
- [`dataframe.rs`](examples/dataframe.rs): Run a query using a DataFrame against a local parquet file
- [`dataframe_in_memory.rs`](examples/dataframe_in_memory.rs): Run a query using a DataFrame against data in memory
Expand Down
8 changes: 7 additions & 1 deletion datafusion/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -641,5 +641,11 @@ doc_comment::doctest!(
#[cfg(doctest)]
doc_comment::doctest!(
"../../../docs/source/library-user-guide/using-the-sql-api.md",
library_user_guide_example_usage
library_user_guide_sql_api
);

#[cfg(doctest)]
doc_comment::doctest!(
"../../../docs/source/library-user-guide/using-the-dataframe-api.md",
library_user_guide_dataframe_api
);
50 changes: 31 additions & 19 deletions datafusion/core/src/physical_optimizer/projection_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{
Transformed, TransformedResult, TreeNode, TreeNodeRecursion,
};
use datafusion_common::{DataFusionError, JoinSide};
use datafusion_common::{internal_err, JoinSide};
use datafusion_physical_expr::expressions::{Column, Literal};
use datafusion_physical_expr::{
utils::collect_columns, Partitioning, PhysicalExpr, PhysicalExprRef,
Expand Down Expand Up @@ -640,6 +640,7 @@ fn try_pushdown_through_hash_join(
&projection_as_columns[0..=far_right_left_col_ind as _],
&projection_as_columns[far_left_right_col_ind as _..],
hash_join.on(),
hash_join.left().schema().fields().len(),
) else {
return Ok(None);
};
Expand All @@ -649,8 +650,7 @@ fn try_pushdown_through_hash_join(
&projection_as_columns[0..=far_right_left_col_ind as _],
&projection_as_columns[far_left_right_col_ind as _..],
filter,
hash_join.left(),
hash_join.right(),
hash_join.left().schema().fields().len(),
) {
Some(updated_filter) => Some(updated_filter),
None => return Ok(None),
Expand Down Expand Up @@ -750,8 +750,7 @@ fn try_swapping_with_nested_loop_join(
&projection_as_columns[0..=far_right_left_col_ind as _],
&projection_as_columns[far_left_right_col_ind as _..],
filter,
nl_join.left(),
nl_join.right(),
nl_join.left().schema().fields().len(),
) {
Some(updated_filter) => Some(updated_filter),
None => return Ok(None),
Expand Down Expand Up @@ -806,6 +805,7 @@ fn try_swapping_with_sort_merge_join(
&projection_as_columns[0..=far_right_left_col_ind as _],
&projection_as_columns[far_left_right_col_ind as _..],
sm_join.on(),
sm_join.left().schema().fields().len(),
) else {
return Ok(None);
};
Expand Down Expand Up @@ -859,6 +859,7 @@ fn try_swapping_with_sym_hash_join(
&projection_as_columns[0..=far_right_left_col_ind as _],
&projection_as_columns[far_left_right_col_ind as _..],
sym_join.on(),
sym_join.left().schema().fields().len(),
) else {
return Ok(None);
};
Expand All @@ -868,8 +869,7 @@ fn try_swapping_with_sym_hash_join(
&projection_as_columns[0..=far_right_left_col_ind as _],
&projection_as_columns[far_left_right_col_ind as _..],
filter,
sym_join.left(),
sym_join.right(),
sym_join.left().schema().fields().len(),
) {
Some(updated_filter) => Some(updated_filter),
None => return Ok(None),
Expand Down Expand Up @@ -1090,6 +1090,7 @@ fn update_join_on(
proj_left_exprs: &[(Column, String)],
proj_right_exprs: &[(Column, String)],
hash_join_on: &[(PhysicalExprRef, PhysicalExprRef)],
left_field_size: usize,
) -> Option<Vec<(PhysicalExprRef, PhysicalExprRef)>> {
// TODO: Clippy wants the "map" call removed, but doing so generates
// a compilation error. Remove the clippy directive once this
Expand All @@ -1100,8 +1101,9 @@ fn update_join_on(
.map(|(left, right)| (left, right))
.unzip();

let new_left_columns = new_columns_for_join_on(&left_idx, proj_left_exprs);
let new_right_columns = new_columns_for_join_on(&right_idx, proj_right_exprs);
let new_left_columns = new_columns_for_join_on(&left_idx, proj_left_exprs, 0);
let new_right_columns =
new_columns_for_join_on(&right_idx, proj_right_exprs, left_field_size);

match (new_left_columns, new_right_columns) {
(Some(left), Some(right)) => Some(left.into_iter().zip(right).collect()),
Expand All @@ -1112,9 +1114,14 @@ fn update_join_on(
/// This function generates a new set of columns to be used in a hash join
/// operation based on a set of equi-join conditions (`hash_join_on`) and a
/// list of projection expressions (`projection_exprs`).
///
/// Notes: Column indices in the projection expressions are based on the join schema,
/// whereas the join on expressions are based on the join child schema. `column_index_offset`
/// represents the offset between them.
fn new_columns_for_join_on(
hash_join_on: &[&PhysicalExprRef],
projection_exprs: &[(Column, String)],
column_index_offset: usize,
) -> Option<Vec<PhysicalExprRef>> {
let new_columns = hash_join_on
.iter()
Expand All @@ -1130,6 +1137,8 @@ fn new_columns_for_join_on(
.enumerate()
.find(|(_, (proj_column, _))| {
column.name() == proj_column.name()
&& column.index() + column_index_offset
== proj_column.index()
})
.map(|(index, (_, alias))| Column::new(alias, index));
if let Some(new_column) = new_column {
Expand All @@ -1138,10 +1147,10 @@ fn new_columns_for_join_on(
// If the column is not found in the projection expressions,
// it means that the column is not projected. In this case,
// we cannot push the projection down.
Err(DataFusionError::Internal(format!(
internal_err!(
"Column {:?} not found in projection expressions",
column
)))
)
}
} else {
Ok(Transformed::no(expr))
Expand All @@ -1160,21 +1169,20 @@ fn update_join_filter(
projection_left_exprs: &[(Column, String)],
projection_right_exprs: &[(Column, String)],
join_filter: &JoinFilter,
join_left: &Arc<dyn ExecutionPlan>,
join_right: &Arc<dyn ExecutionPlan>,
left_field_size: usize,
) -> Option<JoinFilter> {
let mut new_left_indices = new_indices_for_join_filter(
join_filter,
JoinSide::Left,
projection_left_exprs,
join_left.schema(),
0,
)
.into_iter();
let mut new_right_indices = new_indices_for_join_filter(
join_filter,
JoinSide::Right,
projection_right_exprs,
join_right.schema(),
left_field_size,
)
.into_iter();

Expand Down Expand Up @@ -1204,20 +1212,24 @@ fn update_join_filter(
/// This function determines and returns a vector of indices representing the
/// positions of columns in `projection_exprs` that are involved in `join_filter`,
/// and correspond to a particular side (`join_side`) of the join operation.
///
/// Notes: Column indices in the projection expressions are based on the join schema,
/// whereas the join filter is based on the join child schema. `column_index_offset`
/// represents the offset between them.
fn new_indices_for_join_filter(
join_filter: &JoinFilter,
join_side: JoinSide,
projection_exprs: &[(Column, String)],
join_child_schema: SchemaRef,
column_index_offset: usize,
) -> Vec<usize> {
join_filter
.column_indices()
.iter()
.filter(|col_idx| col_idx.side == join_side)
.filter_map(|col_idx| {
projection_exprs.iter().position(|(col, _)| {
col.name() == join_child_schema.fields()[col_idx.index].name()
})
projection_exprs
.iter()
.position(|(col, _)| col_idx.index + column_index_offset == col.index())
})
.collect()
}
Expand Down
Loading

0 comments on commit 9d04c3a

Please sign in to comment.