Skip to content

Commit

Permalink
fix parquet file format adapted projection by providing the proper sc…
Browse files Browse the repository at this point in the history
…hema to the RecordBatch
  • Loading branch information
Igosuki committed Feb 3, 2022
1 parent 80078b5 commit f2debbb
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 27 deletions.
2 changes: 1 addition & 1 deletion datafusion/src/physical_plan/file_format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ impl SchemaAdapter {

let projected_schema = Arc::new(self.table_schema.clone().project(projections)?);

let merged_batch = RecordBatch::try_new(projected_schema, cols)?;
let merged_batch = RecordBatch::try_new(projected_schema, cols.clone())?;

Ok(merged_batch)
}
Expand Down
39 changes: 13 additions & 26 deletions datafusion/src/physical_plan/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use std::{any::Any, convert::TryInto};

use crate::datasource::object_store::ObjectStore;
use crate::datasource::PartitionedFile;
use crate::field_util::{FieldExt, SchemaExt};
use crate::field_util::SchemaExt;
use crate::record_batch::RecordBatch;
use crate::{
error::{DataFusionError, Result},
Expand All @@ -40,7 +40,6 @@ use crate::{
},
scalar::ScalarValue,
};
use arrow::array::new_null_array;
use arrow::error::ArrowError;
use arrow::{
array::ArrayRef,
Expand Down Expand Up @@ -463,7 +462,8 @@ fn read_partition(
limit: Option<usize>,
mut partition_column_projector: PartitionColumnProjector,
) -> Result<()> {
for partitioned_file in partition {
let mut total_rows = 0;
'outer: for partitioned_file in partition {
debug!("Reading file {}", &partitioned_file.file_meta.path());

let file_metrics = ParquetFileMetrics::new(
Expand Down Expand Up @@ -494,32 +494,16 @@ fn read_partition(
)));
}

let read_schema = Arc::new(file_schema.clone().project(&adapted_projections)?);

let total_cols = &file_schema.fields().len();
let read_schema = record_reader.schema().clone();
for chunk_r in record_reader {
match chunk_r {
Ok(chunk) => {
let mut cols: Vec<ArrayRef> = Vec::with_capacity(*total_cols);
let batch_cols = chunk.columns().to_vec();

for field_idx in projection {
let merged_field = &file_schema.fields()[*field_idx];
if let Some((batch_idx, _name)) =
read_schema.column_with_name(merged_field.name())
{
cols.push(batch_cols[batch_idx].clone());
} else {
cols.push(
new_null_array(
merged_field.data_type().clone(),
chunk.len(),
)
.into(),
)
}
}
let batch = RecordBatch::try_new(read_schema.clone(), cols)?;
total_rows += chunk.len();

let batch = RecordBatch::try_new(
read_schema.clone(),
chunk.columns().to_vec(),
)?;

let adapted_batch = schema_adapter.adapt_batch(batch, projection)?;

Expand All @@ -528,6 +512,9 @@ fn read_partition(
response_tx
.blocking_send(proj_batch)
.map_err(|x| DataFusionError::Execution(format!("{}", x)))?;
if limit.map(|l| total_rows >= l).unwrap_or(false) {
break 'outer;
}
}
Err(e) => {
let err_msg =
Expand Down

0 comments on commit f2debbb

Please sign in to comment.