Skip to content

Commit

Permalink
feat: implement repartitioned for DeltaScan (#2421)
Browse files Browse the repository at this point in the history
# Description
This implements repartitioned from the ExecutionPlan trait of DeltaScan.
Currently, Delta tables without partitions are read with all its files
in a single file group of the underlying `ParquetExec`. This seems to
mean that Delta tables without partitions are read without concurrency.
With repartitioned we can repartition the DeltaScan to get concurrency
when reading.
  • Loading branch information
jkylling authored Apr 16, 2024
1 parent 9736522 commit ebbdd69
Showing 1 changed file with 19 additions and 1 deletion.
20 changes: 19 additions & 1 deletion crates/core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ use datafusion::physical_plan::{
use datafusion_common::scalar::ScalarValue;
use datafusion_common::tree_node::{TreeNode, TreeNodeVisitor, VisitRecursion};
use datafusion_common::{
Column, DFSchema, DataFusionError, Result as DataFusionResult, ToDFSchema,
config::ConfigOptions, Column, DFSchema, DataFusionError, Result as DataFusionResult,
ToDFSchema,
};
use datafusion_expr::expr::ScalarFunction;
use datafusion_expr::logical_plan::CreateExternalTable;
Expand Down Expand Up @@ -860,6 +861,23 @@ impl ExecutionPlan for DeltaScan {
fn statistics(&self) -> DataFusionResult<Statistics> {
self.parquet_scan.statistics()
}

fn repartitioned(
&self,
target_partitions: usize,
config: &ConfigOptions,
) -> DataFusionResult<Option<Arc<dyn ExecutionPlan>>> {
if let Some(parquet_scan) = self.parquet_scan.repartitioned(target_partitions, config)? {
Ok(Some(Arc::new(DeltaScan {
table_uri: self.table_uri.clone(),
config: self.config.clone(),
parquet_scan,
logical_schema: self.logical_schema.clone(),
})))
} else {
Ok(None)
}
}
}

pub(crate) fn get_null_of_arrow_type(t: &ArrowDataType) -> DeltaResult<ScalarValue> {
Expand Down

0 comments on commit ebbdd69

Please sign in to comment.