-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use DataFusionError instead of ArrowError in SendableRecordBatchStream #5101
Conversation
@tustvold please check whenever you have time, the code rotting fast :) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks good to me, mostly just some suggestions to use ?
instead of map_err(Into::into)
in a couple of places.
Given the intrusiveness of this change, I think we should leave this open for a few days in case there are any objections.
@@ -400,12 +399,9 @@ impl NestedLoopJoinStream { | |||
let mut left_indices_builder = UInt64Builder::new(); | |||
let mut right_indices_builder = UInt32Builder::new(); | |||
let left_right_indices = match indices_result { | |||
Err(_) => { | |||
// TODO why the type of result stream is `Result<T, ArrowError>`, and not the `DataFusionError` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
😆
.collect::<Result<Vec<_>, ArrowError>>() | ||
.map_err(Into::<DataFusionError>::into)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.collect::<Result<Vec<_>, ArrowError>>() | |
.map_err(Into::<DataFusionError>::into)?; | |
.collect::<Result<Vec<_>, ArrowError>>()?; |
.collect::<Result<Vec<_>, ArrowError>>() | ||
.map_err(Into::<DataFusionError>::into)? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.collect::<Result<Vec<_>, ArrowError>>() | |
.map_err(Into::<DataFusionError>::into)? | |
.collect::<Result<Vec<_>, ArrowError>>()? |
@@ -821,7 +820,7 @@ pub(crate) fn build_batch_from_indices( | |||
}; | |||
columns.push(array); | |||
} | |||
RecordBatch::try_new(Arc::new(schema.clone()), columns) | |||
RecordBatch::try_new(Arc::new(schema.clone()), columns).map_err(Into::into) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
RecordBatch::try_new(Arc::new(schema.clone()), columns).map_err(Into::into) | |
Ok(RecordBatch::try_new(Arc::new(schema.clone()), columns)?) |
.and_then(|b| { | ||
self.pc_projector | ||
.project(b, partition_values) | ||
.map_err(|e| ArrowError::ExternalError(e.into())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this needed, project
already returns ArrowError
AFAICT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is project from DataFusion, it returns DataFusionError
.and_then(|array| { | ||
Ok(as_boolean_array(&array)?) | ||
// apply filter array to record batch | ||
.and_then(|filter_array| filter_record_batch(batch, filter_array)) | ||
.and_then(|filter_array| { | ||
filter_record_batch(batch, filter_array).map_err(Into::into) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
filter_record_batch(batch, filter_array).map_err(Into::into) | |
Ok(filter_record_batch(batch, filter_array)?) |
) | ||
.map_err(Into::<DataFusionError>::into)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
) | |
.map_err(Into::<DataFusionError>::into)?; | |
)?; |
@@ -462,7 +461,7 @@ impl SortedPartitionByBoundedWindowStream { | |||
if let Some(columns_to_show) = columns_to_show { | |||
let n_generated = columns_to_show[0].len(); | |||
self.prune_state(n_generated)?; | |||
RecordBatch::try_new(schema, columns_to_show) | |||
RecordBatch::try_new(schema, columns_to_show).map_err(Into::into) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
RecordBatch::try_new(schema, columns_to_show).map_err(Into::into) | |
Ok(RecordBatch::try_new(schema, columns_to_show)?) |
.collect::<Result<Vec<ArrayRef>, ArrowError>>() | ||
.map_err(Into::<DataFusionError>::into)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.collect::<Result<Vec<ArrayRef>, ArrowError>>() | |
.map_err(Into::<DataFusionError>::into)?; | |
.collect::<Result<Vec<ArrayRef>, ArrowError>>()?; |
|
||
// combine with the original cols | ||
// note the setup of window aggregates is that they newly calculated window | ||
// expression results are always appended to the columns | ||
let mut batch_columns = batch.columns().to_vec(); | ||
// calculate window cols | ||
batch_columns.extend_from_slice(&columns); | ||
RecordBatch::try_new(self.schema.clone(), batch_columns) | ||
RecordBatch::try_new(self.schema.clone(), batch_columns).map_err(Into::into) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
RecordBatch::try_new(self.schema.clone(), batch_columns).map_err(Into::into) | |
Ok(RecordBatch::try_new(self.schema.clone(), batch_columns)?) |
Thanks @tustvold for your review. Fixed all the comments |
cc @liukun4515 , You may also be interested in this. |
Also @yahoNanJing and the Ballista team, this may result in chrun |
Benchmark runs are scheduled for baseline = 9c8bdfe and contender = 74b05fa. 74b05fa is a master commit associated with this PR. Results will be available as each benchmark for each run completes. |
Which issue does this PR close?
Closes #5039.
Rationale for this change
See #5039
What changes are included in this PR?
Replace ArrowError with DataFusionError in DF context
Are these changes tested?
Yes
Are there any user-facing changes?
User will get
DatafusionError::ArrowError
insteadarrow::ArrowError
for datafusion contexts