Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multiple files per partitions for CSV Avro Json #1138

Merged
merged 3 commits into from
Oct 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions ballista/rust/core/proto/ballista.proto
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,7 @@ message ParquetScanExecNode {
}

message CsvScanExecNode {
repeated PartitionedFile files = 1;
repeated FileGroup file_groups = 1;
Schema schema = 2;
bool has_header = 3;
uint32 batch_size = 4;
Expand All @@ -626,7 +626,7 @@ message CsvScanExecNode {
}

message AvroScanExecNode {
repeated PartitionedFile files = 1;
repeated FileGroup file_groups = 1;
Schema schema = 2;
uint32 batch_size = 4;
repeated uint32 projection = 6;
Expand Down
12 changes: 6 additions & 6 deletions ballista/rust/core/src/serde/physical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,10 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {

Ok(Arc::new(CsvExec::new(
Arc::new(LocalFileSystem {}),
scan.files
scan.file_groups
.iter()
.map(|f| f.into())
.collect::<Vec<PartitionedFile>>(),
.map(|p| p.into())
.collect::<Vec<Vec<PartitionedFile>>>(),
statistics,
schema,
scan.has_header,
Expand Down Expand Up @@ -165,10 +165,10 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {

Ok(Arc::new(AvroExec::new(
Arc::new(LocalFileSystem {}),
scan.files
scan.file_groups
.iter()
.map(|f| f.into())
.collect::<Vec<PartitionedFile>>(),
.map(|p| p.into())
.collect::<Vec<Vec<PartitionedFile>>>(),
statistics,
schema,
Some(projection),
Expand Down
22 changes: 12 additions & 10 deletions ballista/rust/core/src/serde/physical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,14 +244,15 @@ impl TryInto<protobuf::PhysicalPlanNode> for Arc<dyn ExecutionPlan> {
))),
})
} else if let Some(exec) = plan.downcast_ref::<CsvExec>() {
let file_groups = exec
.file_groups()
.iter()
.map(|p| p.as_slice().into())
.collect();
Ok(protobuf::PhysicalPlanNode {
physical_plan_type: Some(PhysicalPlanType::CsvScan(
protobuf::CsvScanExecNode {
files: exec
.files()
.iter()
.map(|f| f.into())
.collect::<Vec<protobuf::PartitionedFile>>(),
file_groups,
statistics: Some((&exec.statistics()).into()),
limit: exec
.limit()
Expand Down Expand Up @@ -301,14 +302,15 @@ impl TryInto<protobuf::PhysicalPlanNode> for Arc<dyn ExecutionPlan> {
)),
})
} else if let Some(exec) = plan.downcast_ref::<AvroExec>() {
let file_groups = exec
.file_groups()
.iter()
.map(|p| p.as_slice().into())
.collect();
Ok(protobuf::PhysicalPlanNode {
physical_plan_type: Some(PhysicalPlanType::AvroScan(
protobuf::AvroScanExecNode {
files: exec
.files()
.iter()
.map(|f| f.into())
.collect::<Vec<protobuf::PartitionedFile>>(),
file_groups,
statistics: Some((&exec.statistics()).into()),
limit: exec
.limit()
Expand Down
3 changes: 1 addition & 2 deletions datafusion/src/datasource/file_format/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,7 @@ impl FileFormat for AvroFormat {
) -> Result<Arc<dyn ExecutionPlan>> {
let exec = AvroExec::new(
conf.object_store,
// flattening this for now because CsvExec does not support partitioning yet
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️

conf.files.into_iter().flatten().collect::<Vec<_>>(),
conf.files,
conf.statistics,
conf.schema,
conf.projection,
Expand Down
3 changes: 1 addition & 2 deletions datafusion/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,7 @@ impl FileFormat for CsvFormat {
) -> Result<Arc<dyn ExecutionPlan>> {
let exec = CsvExec::new(
conf.object_store,
// flattening this for now because CsvExec does not support partitioning yet
conf.files.into_iter().flatten().collect::<Vec<_>>(),
conf.files,
conf.statistics,
conf.schema,
self.has_header,
Expand Down
3 changes: 1 addition & 2 deletions datafusion/src/datasource/file_format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,7 @@ impl FileFormat for JsonFormat {
) -> Result<Arc<dyn ExecutionPlan>> {
let exec = NdJsonExec::new(
conf.object_store,
// flattening this for now because NdJsonExec does not support partitioning yet
conf.files.into_iter().flatten().collect::<Vec<_>>(),
conf.files,
conf.statistics,
conf.schema,
conf.projection,
Expand Down
158 changes: 49 additions & 109 deletions datafusion/src/physical_plan/file_format/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,79 +16,74 @@
// under the License.

//! Execution plan for reading line-delimited Avro files
#[cfg(feature = "avro")]
use crate::avro_to_arrow;
use crate::datasource::object_store::ObjectStore;
use crate::datasource::PartitionedFile;
use crate::error::{DataFusionError, Result};
#[cfg(feature = "avro")]
use crate::physical_plan::RecordBatchStream;
use crate::physical_plan::{
DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics,
};
use arrow::datatypes::{Schema, SchemaRef};
#[cfg(feature = "avro")]
use arrow::{error::Result as ArrowResult, record_batch::RecordBatch};
use arrow::error::ArrowError;

use async_trait::async_trait;
#[cfg(feature = "avro")]
use futures::Stream;
use std::any::Any;
use std::sync::Arc;

#[cfg(feature = "avro")]
use std::{
io::Read,
pin::Pin,
task::{Context, Poll},
};
use super::file_stream::{BatchIter, FileStream};

/// Execution plan for scanning Avro data source
#[derive(Debug, Clone)]
pub struct AvroExec {
object_store: Arc<dyn ObjectStore>,
files: Vec<PartitionedFile>,
file_groups: Vec<Vec<PartitionedFile>>,
statistics: Statistics,
schema: SchemaRef,
file_schema: SchemaRef,
projection: Option<Vec<usize>>,
projected_schema: SchemaRef,
batch_size: usize,
limit: Option<usize>,
}

impl AvroExec {
/// Create a new JSON reader execution plan provided file list and schema
/// TODO: support partitiond file list (Vec<Vec<PartitionedFile>>)
/// Create a new Avro reader execution plan provided file list and schema
pub fn new(
object_store: Arc<dyn ObjectStore>,
files: Vec<PartitionedFile>,
file_groups: Vec<Vec<PartitionedFile>>,
statistics: Statistics,
schema: SchemaRef,
file_schema: SchemaRef,
projection: Option<Vec<usize>>,
batch_size: usize,
limit: Option<usize>,
) -> Self {
let projected_schema = match &projection {
None => Arc::clone(&schema),
None => Arc::clone(&file_schema),
Some(p) => Arc::new(Schema::new(
p.iter().map(|i| schema.field(*i).clone()).collect(),
p.iter().map(|i| file_schema.field(*i).clone()).collect(),
)),
};

Self {
object_store,
files,
file_groups,
statistics,
schema,
file_schema,
projection,
projected_schema,
batch_size,
limit,
}
}
/// List of data files
pub fn files(&self) -> &[PartitionedFile] {
&self.files
pub fn file_groups(&self) -> &[Vec<PartitionedFile>] {
&self.file_groups
}
/// The schema before projection
pub fn file_schema(&self) -> &SchemaRef {
&self.schema
&self.file_schema
}
/// Optional projection for which columns to load
pub fn projection(&self) -> &Option<Vec<usize>> {
Expand All @@ -115,7 +110,7 @@ impl ExecutionPlan for AvroExec {
}

fn output_partitioning(&self) -> Partitioning {
Partitioning::UnknownPartitioning(self.files.len())
Partitioning::UnknownPartitioning(self.file_groups.len())
}

fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
Expand Down Expand Up @@ -145,26 +140,39 @@ impl ExecutionPlan for AvroExec {

#[cfg(feature = "avro")]
async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream> {
let file = self
.object_store
.file_reader(self.files[partition].file_meta.sized_file.clone())?
.sync_reader()?;

let proj = self.projection.as_ref().map(|p| {
p.iter()
.map(|col_idx| self.schema.field(*col_idx).name())
.map(|col_idx| self.file_schema.field(*col_idx).name())
.cloned()
.collect()
});

let avro_reader = crate::avro_to_arrow::Reader::try_new(
file,
self.schema(),
self.batch_size,
proj,
)?;
let batch_size = self.batch_size;
let file_schema = Arc::clone(&self.file_schema);

// The avro reader cannot limit the number of records, so `remaining` is ignored.
let fun = move |file, _remaining: &Option<usize>| {
let reader_res = avro_to_arrow::Reader::try_new(
file,
Arc::clone(&file_schema),
batch_size,
proj.clone(),
);
match reader_res {
Ok(r) => Box::new(r) as BatchIter,
Err(e) => Box::new(
vec![Err(ArrowError::ExternalError(Box::new(e)))].into_iter(),
),
}
};

Ok(Box::pin(AvroStream::new(avro_reader, self.limit)))
Ok(Box::pin(FileStream::new(
Arc::clone(&self.object_store),
self.file_groups[partition].clone(),
fun,
Arc::clone(&self.projected_schema),
self.limit,
)))
}

fn fmt_as(
Expand All @@ -176,12 +184,8 @@ impl ExecutionPlan for AvroExec {
DisplayFormatType::Default => {
write!(
f,
"AvroExec: files=[{}], batch_size={}, limit={:?}",
self.files
.iter()
.map(|f| f.file_meta.path())
.collect::<Vec<_>>()
.join(", "),
"AvroExec: files={}, batch_size={}, limit={:?}",
super::FileGroupsDisplay(&self.file_groups),
self.batch_size,
self.limit,
)
Expand All @@ -194,70 +198,6 @@ impl ExecutionPlan for AvroExec {
}
}

#[cfg(feature = "avro")]
struct AvroStream<'a, R: Read> {
reader: crate::avro_to_arrow::Reader<'a, R>,
remain: Option<usize>,
}

#[cfg(feature = "avro")]
impl<'a, R: Read> AvroStream<'a, R> {
fn new(reader: crate::avro_to_arrow::Reader<'a, R>, limit: Option<usize>) -> Self {
Self {
reader,
remain: limit,
}
}
}

#[cfg(feature = "avro")]
impl<R: Read + Unpin> Stream for AvroStream<'_, R> {
type Item = ArrowResult<RecordBatch>;

fn poll_next(
mut self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
if let Some(remain) = self.remain.as_mut() {
if *remain < 1 {
return Poll::Ready(None);
}
}

Poll::Ready(match self.reader.next() {
Ok(Some(item)) => {
if let Some(remain) = self.remain.as_mut() {
if *remain >= item.num_rows() {
*remain -= item.num_rows();
Some(Ok(item))
} else {
let len = *remain;
*remain = 0;
Some(Ok(RecordBatch::try_new(
item.schema(),
item.columns()
.iter()
.map(|column| column.slice(0, len))
.collect(),
)?))
}
} else {
Some(Ok(item))
}
}
Ok(None) => None,
Err(err) => Some(Err(err)),
})
}
}

#[cfg(feature = "avro")]
impl<R: Read + Unpin> RecordBatchStream for AvroStream<'_, R> {
fn schema(&self) -> SchemaRef {
self.reader.schema()
}
}

#[cfg(test)]
#[cfg(feature = "avro")]
mod tests {
Expand All @@ -278,9 +218,9 @@ mod tests {
let filename = format!("{}/avro/alltypes_plain.avro", testdata);
let avro_exec = AvroExec::new(
Arc::new(LocalFileSystem {}),
vec![PartitionedFile {
vec![vec![PartitionedFile {
file_meta: local_file_meta(filename.clone()),
}],
}]],
Statistics::default(),
AvroFormat {}
.infer_schema(local_object_reader_stream(vec![filename]))
Expand Down
Loading