diff --git a/rust/src/delta_datafusion.rs b/rust/src/delta_datafusion.rs index 01c17a2fc9..8a6ca27e4d 100644 --- a/rust/src/delta_datafusion.rs +++ b/rust/src/delta_datafusion.rs @@ -1085,6 +1085,45 @@ pub(crate) struct FindFiles { pub partition_scan: bool, } +fn join_batches_with_add_actions( + batches: Vec, + mut actions: HashMap, +) -> DeltaResult> { + // Given RecordBatches that contains `__delta_rs_path` perform a hash join + // with actions to obtain original add actions + + let mut files = Vec::new(); + for batch in batches { + let array = batch + .column_by_name(PATH_COLUMN) + .ok_or_else(|| { + DeltaTableError::Generic(format!("Unable to find column {}", PATH_COLUMN)) + })? + .as_any() + .downcast_ref::() + .ok_or(DeltaTableError::Generic(format!( + "Unable to downcast column {}", + PATH_COLUMN + )))?; + for path in array { + let path = path.ok_or(DeltaTableError::Generic(format!( + "{} cannot be null", + PATH_COLUMN + )))?; + + match actions.remove(path) { + Some(action) => files.push(action), + None => { + return Err(DeltaTableError::Generic( + "Unable to map __delta_rs_path to action.".to_owned(), + )) + } + } + } + } + Ok(files) +} + /// Determine which files contain a record that statisfies the predicate pub(crate) async fn find_files_scan<'a>( snapshot: &DeltaTableState, @@ -1094,9 +1133,8 @@ pub(crate) async fn find_files_scan<'a>( candidates: Vec<&'a Add>, state: &SessionState, expression: &Expr, -) -> DeltaResult> { - let mut files = Vec::new(); - let mut candidate_map: HashMap = HashMap::new(); +) -> DeltaResult> { + let mut candidate_map: HashMap = HashMap::new(); let table_partition_cols = snapshot .current_metadata() @@ -1115,7 +1153,7 @@ pub(crate) async fn find_files_scan<'a>( .or_default() .push(part); - candidate_map.insert(action.path.to_owned(), action); + candidate_map.insert(action.path.to_owned(), action.to_owned()); } let mut table_partition_cols = table_partition_cols @@ -1160,40 +1198,7 @@ pub(crate) async fn find_files_scan<'a>( let task_ctx = Arc::new(TaskContext::from(state)); let path_batches = datafusion::physical_plan::collect(limit, task_ctx).await?; - for batch in path_batches { - if batch.num_rows() > 1 { - return Err(DeltaTableError::Generic( - "Find files returned multiple records for batch".to_owned(), - )); - } - let array = batch - .column_by_name(PATH_COLUMN) - .ok_or_else(|| { - DeltaTableError::Generic(format!("Unable to find column {}", PATH_COLUMN)) - }) - .unwrap() - .as_any() - .downcast_ref::() - .ok_or_else(|| { - DeltaTableError::Generic(format!("Unable to downcast column {}", PATH_COLUMN)) - })?; - - let path = - array.into_iter().next().flatten().ok_or_else(|| { - DeltaTableError::Generic(format!("{} cannot be null", PATH_COLUMN)) - })?; - - match candidate_map.remove(path) { - Some(action) => files.push(action), - None => { - return Err(DeltaTableError::Generic( - "Unable to map __delta_rs_path to action.".to_owned(), - )) - } - } - } - - Ok(files) + join_batches_with_add_actions(path_batches, candidate_map) } pub(crate) async fn scan_memory_table( @@ -1242,33 +1247,12 @@ pub(crate) async fn scan_memory_table( .select(vec![col(PATH_COLUMN)])?; let batches = df.collect().await?; - let mut map = HashMap::new(); - for action in actions { - map.insert(action.path.clone(), action); - } - let mut files = Vec::new(); + let map = actions + .into_iter() + .map(|action| (action.path.clone(), action)) + .collect::>(); - for batch in batches { - let array = batch - .column_by_name(PATH_COLUMN) - .unwrap() - .as_any() - .downcast_ref::() - .ok_or(DeltaTableError::Generic(format!( - "Unable to downcast column {}", - PATH_COLUMN - )))?; - for path in array { - let path = path.ok_or(DeltaTableError::Generic(format!( - "{} cannot be null", - PATH_COLUMN - )))?; - let value = map.remove(path).unwrap(); - files.push(value); - } - } - - Ok(files) + join_batches_with_add_actions(batches, map) } pub(crate) async fn find_files<'a>( @@ -1342,7 +1326,7 @@ pub(crate) async fn find_files<'a>( .await?; Ok(FindFiles { - candidates: candidates.into_iter().map(|s| s.to_owned()).collect(), + candidates, partition_scan: false, }) }