Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use DataFusionError instead of ArrowError in SendableRecordBatchStream #5101

Merged
merged 2 commits into from
Jan 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions datafusion/core/src/physical_plan/aggregates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -722,7 +722,6 @@ mod tests {
use crate::{assert_batches_sorted_eq, physical_plan::common};
use arrow::array::{Float64Array, UInt32Array};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use arrow::error::Result as ArrowResult;
use arrow::record_batch::RecordBatch;
use datafusion_common::{DataFusionError, Result, ScalarValue};
use datafusion_physical_expr::expressions::{lit, ApproxDistinct, Count, Median};
Expand Down Expand Up @@ -1038,7 +1037,7 @@ mod tests {
}

impl Stream for TestYieldingStream {
type Item = ArrowResult<RecordBatch>;
type Item = Result<RecordBatch>;

fn poll_next(
mut self: std::pin::Pin<&mut Self>,
Expand Down
9 changes: 4 additions & 5 deletions datafusion/core/src/physical_plan/aggregates/no_grouping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use crate::physical_plan::aggregates::{
use crate::physical_plan::metrics::{BaselineMetrics, RecordOutput};
use crate::physical_plan::{RecordBatchStream, SendableRecordBatchStream};
use arrow::datatypes::SchemaRef;
use arrow::error::{ArrowError, Result as ArrowResult};
use arrow::record_batch::RecordBatch;
use datafusion_common::Result;
use datafusion_physical_expr::{AggregateExpr, PhysicalExpr};
Expand All @@ -38,7 +37,7 @@ use futures::stream::{Stream, StreamExt};

/// stream struct for aggregation without grouping columns
pub(crate) struct AggregateStream {
stream: BoxStream<'static, ArrowResult<RecordBatch>>,
stream: BoxStream<'static, Result<RecordBatch>>,
schema: SchemaRef,
}

Expand Down Expand Up @@ -112,17 +111,17 @@ impl AggregateStream {
.and_then(|allocated| this.reservation.try_grow(allocated))
{
Ok(_) => continue,
Err(e) => Err(ArrowError::ExternalError(Box::new(e))),
Err(e) => Err(e),
}
}
Some(Err(e)) => Err(e),
None => {
this.finished = true;
let timer = this.baseline_metrics.elapsed_compute().timer();
let result = finalize_aggregation(&this.accumulators, &this.mode)
.map_err(|e| ArrowError::ExternalError(Box::new(e)))
.and_then(|columns| {
RecordBatch::try_new(this.schema.clone(), columns)
.map_err(Into::into)
})
.record_output(&this.baseline_metrics);

Expand All @@ -146,7 +145,7 @@ impl AggregateStream {
}

impl Stream for AggregateStream {
type Item = ArrowResult<RecordBatch>;
type Item = Result<RecordBatch>;

fn poll_next(
mut self: std::pin::Pin<&mut Self>,
Expand Down
14 changes: 4 additions & 10 deletions datafusion/core/src/physical_plan/aggregates/row_hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,10 @@ use crate::physical_plan::{RecordBatchStream, SendableRecordBatchStream};

use crate::execution::memory_pool::{MemoryConsumer, MemoryReservation};
use arrow::array::{new_null_array, PrimitiveArray};
use arrow::array::{Array, UInt32Builder};
use arrow::compute::cast;
use arrow::datatypes::{DataType, Schema, UInt32Type};
use arrow::{array::ArrayRef, compute};
use arrow::{
array::{Array, UInt32Builder},
error::{ArrowError, Result as ArrowResult},
};
use arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
use datafusion_common::{DataFusionError, ScalarValue};
use datafusion_expr::Accumulator;
Expand Down Expand Up @@ -226,7 +223,7 @@ impl GroupedHashAggregateStream {
}

impl Stream for GroupedHashAggregateStream {
type Item = ArrowResult<RecordBatch>;
type Item = Result<RecordBatch>;

fn poll_next(
mut self: std::pin::Pin<&mut Self>,
Expand All @@ -252,9 +249,7 @@ impl Stream for GroupedHashAggregateStream {
});

if let Err(e) = result {
return Poll::Ready(Some(Err(
ArrowError::ExternalError(Box::new(e)),
)));
return Poll::Ready(Some(Err(e)));
}
}
// inner had error, return to caller
Expand Down Expand Up @@ -569,7 +564,7 @@ impl std::fmt::Debug for RowAggregationState {

impl GroupedHashAggregateStream {
/// Create a RecordBatch with all group keys and accumulator' states or values.
fn create_batch_from_map(&mut self) -> ArrowResult<Option<RecordBatch>> {
fn create_batch_from_map(&mut self) -> Result<Option<RecordBatch>> {
let skip_items = self.row_group_skip_position;
if skip_items > self.row_aggr_state.group_states.len() {
return Ok(None);
Expand Down Expand Up @@ -624,7 +619,6 @@ impl GroupedHashAggregateStream {
// the intermediate GroupByScalar type was not the same as the
// output
cast(&item, field.data_type())
.map_err(DataFusionError::ArrowError)
}?;
results.push(result);
}
Expand Down
3 changes: 2 additions & 1 deletion datafusion/core/src/physical_plan/analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,8 @@ impl ExecutionPlan for AnalyzeExec {
Arc::new(type_builder.finish()),
Arc::new(plan_builder.finish()),
],
);
)
.map_err(Into::into);
// again ignore error
tx.send(maybe_batch).await.ok();
});
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/physical_plan/coalesce_batches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ struct CoalesceBatchesStream {
}

impl Stream for CoalesceBatchesStream {
type Item = ArrowResult<RecordBatch>;
type Item = Result<RecordBatch>;

fn poll_next(
mut self: Pin<&mut Self>,
Expand All @@ -200,7 +200,7 @@ impl CoalesceBatchesStream {
fn poll_next_inner(
self: &mut Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<ArrowResult<RecordBatch>>> {
) -> Poll<Option<Result<RecordBatch>>> {
// Get a clone (uses same underlying atomic) as self gets borrowed below
let cloned_time = self.baseline_metrics.elapsed_compute().clone();

Expand Down
8 changes: 4 additions & 4 deletions datafusion/core/src/physical_plan/coalesce_partitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ use std::task::Poll;
use futures::Stream;
use tokio::sync::mpsc;

use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
use arrow::{datatypes::SchemaRef, error::Result as ArrowResult};

use super::common::AbortOnDropMany;
use super::expressions::PhysicalSortExpr;
Expand Down Expand Up @@ -138,7 +138,7 @@ impl ExecutionPlan for CoalescePartitionsExec {
// least one result in an attempt to maximize
// parallelism.
let (sender, receiver) =
mpsc::channel::<ArrowResult<RecordBatch>>(input_partitions);
mpsc::channel::<Result<RecordBatch>>(input_partitions);

// spawn independent tasks whose resulting streams (of batches)
// are sent to the channel for consumption.
Expand Down Expand Up @@ -185,14 +185,14 @@ impl ExecutionPlan for CoalescePartitionsExec {

struct MergeStream {
schema: SchemaRef,
input: mpsc::Receiver<ArrowResult<RecordBatch>>,
input: mpsc::Receiver<Result<RecordBatch>>,
baseline_metrics: BaselineMetrics,
#[allow(unused)]
drop_helper: AbortOnDropMany<()>,
}

impl Stream for MergeStream {
type Item = ArrowResult<RecordBatch>;
type Item = Result<RecordBatch>;

fn poll_next(
mut self: std::pin::Pin<&mut Self>,
Expand Down
27 changes: 12 additions & 15 deletions datafusion/core/src/physical_plan/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use crate::physical_plan::{displayable, ColumnStatistics, ExecutionPlan, Statist
use arrow::compute::concat;
use arrow::datatypes::{Schema, SchemaRef};
use arrow::error::ArrowError;
use arrow::error::Result as ArrowResult;
use arrow::ipc::writer::{FileWriter, IpcWriteOptions};
use arrow::record_batch::RecordBatch;
use datafusion_physical_expr::utils::ordering_satisfy;
Expand Down Expand Up @@ -68,7 +67,7 @@ impl SizedRecordBatchStream {
}

impl Stream for SizedRecordBatchStream {
type Item = ArrowResult<RecordBatch>;
type Item = Result<RecordBatch>;

fn poll_next(
mut self: std::pin::Pin<&mut Self>,
Expand All @@ -92,10 +91,7 @@ impl RecordBatchStream for SizedRecordBatchStream {

/// Create a vector of record batches from a stream
pub async fn collect(stream: SendableRecordBatchStream) -> Result<Vec<RecordBatch>> {
stream
.try_collect::<Vec<_>>()
.await
.map_err(DataFusionError::from)
stream.try_collect::<Vec<_>>().await
}

/// Merge two record batch references into a single record batch.
Expand All @@ -104,15 +100,16 @@ pub fn merge_batches(
first: &RecordBatch,
second: &RecordBatch,
schema: SchemaRef,
) -> ArrowResult<RecordBatch> {
) -> Result<RecordBatch> {
let columns = (0..schema.fields.len())
.map(|index| {
let first_column = first.column(index).as_ref();
let second_column = second.column(index).as_ref();
concat(&[first_column, second_column])
})
.collect::<ArrowResult<Vec<_>>>()?;
RecordBatch::try_new(schema, columns)
.collect::<Result<Vec<_>, ArrowError>>()
.map_err(Into::<DataFusionError>::into)?;
RecordBatch::try_new(schema, columns).map_err(Into::into)
}

/// Merge a slice of record batch references into a single record batch, or
Expand All @@ -121,7 +118,7 @@ pub fn merge_batches(
pub fn merge_multiple_batches(
batches: &[&RecordBatch],
schema: SchemaRef,
) -> ArrowResult<Option<RecordBatch>> {
) -> Result<Option<RecordBatch>> {
Ok(if batches.is_empty() {
None
} else {
Expand All @@ -134,7 +131,8 @@ pub fn merge_multiple_batches(
.collect::<Vec<_>>(),
)
})
.collect::<ArrowResult<Vec<_>>>()?;
.collect::<Result<Vec<_>, ArrowError>>()
.map_err(Into::<DataFusionError>::into)?;
Some(RecordBatch::try_new(schema, columns)?)
})
}
Expand Down Expand Up @@ -190,7 +188,7 @@ fn build_file_list_recurse(
/// Spawns a task to the tokio threadpool and writes its outputs to the provided mpsc sender
pub(crate) fn spawn_execution(
input: Arc<dyn ExecutionPlan>,
output: mpsc::Sender<ArrowResult<RecordBatch>>,
output: mpsc::Sender<Result<RecordBatch>>,
partition: usize,
context: Arc<TaskContext>,
) -> JoinHandle<()> {
Expand All @@ -199,8 +197,7 @@ pub(crate) fn spawn_execution(
Err(e) => {
// If send fails, plan being torn down,
// there is no place to send the error.
let arrow_error = ArrowError::ExternalError(Box::new(e));
output.send(Err(arrow_error)).await.ok();
output.send(Err(e)).await.ok();
debug!(
"Stopping execution: error executing input: {}",
displayable(input.as_ref()).one_line()
Expand Down Expand Up @@ -524,7 +521,7 @@ impl IPCWriter {

/// Finish the writer
pub fn finish(&mut self) -> Result<()> {
self.writer.finish().map_err(DataFusionError::ArrowError)
self.writer.finish().map_err(Into::into)
}

/// Path write to
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_plan/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@ mod tests {
let err = it.next().await.unwrap().unwrap_err().to_string();
assert_eq!(
err,
"Csv error: incorrect number of fields for line 1, expected 14 got 13"
"Arrow error: Csv error: incorrect number of fields for line 1, expected 14 got 13"
);
Ok(())
}
Expand Down
26 changes: 14 additions & 12 deletions datafusion/core/src/physical_plan/file_format/file_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ use std::task::{Context, Poll};
use std::time::Instant;

use arrow::datatypes::SchemaRef;
use arrow::{error::Result as ArrowResult, record_batch::RecordBatch};
use arrow::error::ArrowError;
use arrow::record_batch::RecordBatch;
use datafusion_common::ScalarValue;
use futures::future::BoxFuture;
use futures::stream::BoxStream;
Expand All @@ -45,7 +46,7 @@ use crate::physical_plan::RecordBatchStream;

/// A fallible future that resolves to a stream of [`RecordBatch`]
pub type FileOpenFuture =
BoxFuture<'static, Result<BoxStream<'static, ArrowResult<RecordBatch>>>>;
BoxFuture<'static, Result<BoxStream<'static, Result<RecordBatch, ArrowError>>>>;

/// Generic API for opening a file using an [`ObjectStore`] and resolving to a
/// stream of [`RecordBatch`]
Expand Down Expand Up @@ -96,7 +97,7 @@ enum FileStreamState {
/// Partitioning column values for the current batch_iter
partition_values: Vec<ScalarValue>,
/// The reader instance
reader: BoxStream<'static, ArrowResult<RecordBatch>>,
reader: BoxStream<'static, Result<RecordBatch, ArrowError>>,
},
/// Encountered an error
Error,
Expand Down Expand Up @@ -201,10 +202,7 @@ impl<F: FileOpener> FileStream<F> {
})
}

fn poll_inner(
&mut self,
cx: &mut Context<'_>,
) -> Poll<Option<ArrowResult<RecordBatch>>> {
fn poll_inner(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<RecordBatch>>> {
loop {
match &mut self.state {
FileStreamState::Idle => {
Expand All @@ -230,7 +228,7 @@ impl<F: FileOpener> FileStream<F> {
}
Err(e) => {
self.state = FileStreamState::Error;
return Poll::Ready(Some(Err(e.into())));
return Poll::Ready(Some(Err(e)));
}
}
}
Expand All @@ -249,7 +247,7 @@ impl<F: FileOpener> FileStream<F> {
}
Err(e) => {
self.state = FileStreamState::Error;
return Poll::Ready(Some(Err(e.into())));
return Poll::Ready(Some(Err(e)));
}
},
FileStreamState::Scan {
Expand All @@ -260,7 +258,11 @@ impl<F: FileOpener> FileStream<F> {
self.file_stream_metrics.time_scanning_until_data.stop();
self.file_stream_metrics.time_scanning_total.stop();
let result = result
.and_then(|b| self.pc_projector.project(b, partition_values))
.and_then(|b| {
self.pc_projector
.project(b, partition_values)
.map_err(|e| ArrowError::ExternalError(e.into()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this needed, project already returns ArrowError AFAICT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is project from DataFusion, it returns DataFusionError

})
.map(|batch| match &mut self.remain {
Some(remain) => {
if *remain > batch.num_rows() {
Expand All @@ -280,7 +282,7 @@ impl<F: FileOpener> FileStream<F> {
self.state = FileStreamState::Error
}
self.file_stream_metrics.time_scanning_total.start();
return Poll::Ready(Some(result));
return Poll::Ready(Some(result.map_err(Into::into)));
}
None => {
self.file_stream_metrics.time_scanning_until_data.stop();
Expand All @@ -297,7 +299,7 @@ impl<F: FileOpener> FileStream<F> {
}

impl<F: FileOpener> Stream for FileStream<F> {
type Item = ArrowResult<RecordBatch>;
type Item = Result<RecordBatch>;

fn poll_next(
mut self: Pin<&mut Self>,
Expand Down
Loading