Skip to content

Commit

Permalink
cleanup unwraps
Browse files Browse the repository at this point in the history
  • Loading branch information
Blajda committed Jun 8, 2023
1 parent fc30ae4 commit 0f74bc7
Showing 1 changed file with 49 additions and 65 deletions.
114 changes: 49 additions & 65 deletions rust/src/delta_datafusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1085,6 +1085,45 @@ pub(crate) struct FindFiles {
pub partition_scan: bool,
}

fn join_batches_with_add_actions(
batches: Vec<RecordBatch>,
mut actions: HashMap<String, Add>,
) -> DeltaResult<Vec<Add>> {
// Given RecordBatches that contains `__delta_rs_path` perform a hash join
// with actions to obtain original add actions

let mut files = Vec::new();
for batch in batches {
let array = batch
.column_by_name(PATH_COLUMN)
.ok_or_else(|| {
DeltaTableError::Generic(format!("Unable to find column {}", PATH_COLUMN))
})?
.as_any()
.downcast_ref::<StringArray>()
.ok_or(DeltaTableError::Generic(format!(
"Unable to downcast column {}",
PATH_COLUMN
)))?;
for path in array {
let path = path.ok_or(DeltaTableError::Generic(format!(
"{} cannot be null",
PATH_COLUMN
)))?;

match actions.remove(path) {
Some(action) => files.push(action),
None => {
return Err(DeltaTableError::Generic(
"Unable to map __delta_rs_path to action.".to_owned(),
))
}
}
}
}
Ok(files)
}

/// Determine which files contain a record that statisfies the predicate
pub(crate) async fn find_files_scan<'a>(
snapshot: &DeltaTableState,
Expand All @@ -1094,9 +1133,8 @@ pub(crate) async fn find_files_scan<'a>(
candidates: Vec<&'a Add>,
state: &SessionState,
expression: &Expr,
) -> DeltaResult<Vec<&'a Add>> {
let mut files = Vec::new();
let mut candidate_map: HashMap<String, &'a Add> = HashMap::new();
) -> DeltaResult<Vec<Add>> {
let mut candidate_map: HashMap<String, Add> = HashMap::new();

let table_partition_cols = snapshot
.current_metadata()
Expand All @@ -1115,7 +1153,7 @@ pub(crate) async fn find_files_scan<'a>(
.or_default()
.push(part);

candidate_map.insert(action.path.to_owned(), action);
candidate_map.insert(action.path.to_owned(), action.to_owned());
}

let mut table_partition_cols = table_partition_cols
Expand Down Expand Up @@ -1160,40 +1198,7 @@ pub(crate) async fn find_files_scan<'a>(
let task_ctx = Arc::new(TaskContext::from(state));
let path_batches = datafusion::physical_plan::collect(limit, task_ctx).await?;

for batch in path_batches {
if batch.num_rows() > 1 {
return Err(DeltaTableError::Generic(
"Find files returned multiple records for batch".to_owned(),
));
}
let array = batch
.column_by_name(PATH_COLUMN)
.ok_or_else(|| {
DeltaTableError::Generic(format!("Unable to find column {}", PATH_COLUMN))
})
.unwrap()
.as_any()
.downcast_ref::<StringArray>()
.ok_or_else(|| {
DeltaTableError::Generic(format!("Unable to downcast column {}", PATH_COLUMN))
})?;

let path =
array.into_iter().next().flatten().ok_or_else(|| {
DeltaTableError::Generic(format!("{} cannot be null", PATH_COLUMN))
})?;

match candidate_map.remove(path) {
Some(action) => files.push(action),
None => {
return Err(DeltaTableError::Generic(
"Unable to map __delta_rs_path to action.".to_owned(),
))
}
}
}

Ok(files)
join_batches_with_add_actions(path_batches, candidate_map)
}

pub(crate) async fn scan_memory_table(
Expand Down Expand Up @@ -1242,33 +1247,12 @@ pub(crate) async fn scan_memory_table(
.select(vec![col(PATH_COLUMN)])?;
let batches = df.collect().await?;

let mut map = HashMap::new();
for action in actions {
map.insert(action.path.clone(), action);
}
let mut files = Vec::new();
let map = actions
.into_iter()
.map(|action| (action.path.clone(), action))
.collect::<HashMap<String, Add>>();

for batch in batches {
let array = batch
.column_by_name(PATH_COLUMN)
.unwrap()
.as_any()
.downcast_ref::<StringArray>()
.ok_or(DeltaTableError::Generic(format!(
"Unable to downcast column {}",
PATH_COLUMN
)))?;
for path in array {
let path = path.ok_or(DeltaTableError::Generic(format!(
"{} cannot be null",
PATH_COLUMN
)))?;
let value = map.remove(path).unwrap();
files.push(value);
}
}

Ok(files)
join_batches_with_add_actions(batches, map)
}

pub(crate) async fn find_files<'a>(
Expand Down Expand Up @@ -1342,7 +1326,7 @@ pub(crate) async fn find_files<'a>(
.await?;

Ok(FindFiles {
candidates: candidates.into_iter().map(|s| s.to_owned()).collect(),
candidates,
partition_scan: false,
})
}
Expand Down

0 comments on commit 0f74bc7

Please sign in to comment.