Skip to content

Commit

Permalink
Implement Hive-Style Partitioned Write Support (#7801)
Browse files Browse the repository at this point in the history
* squash

* rebase

* add issue link

* docs and cargo fmt

* Apply suggestions from code review

apply review suggestions

Co-authored-by: Andrew Lamb <[email protected]>

* refactor remove_partition_by_columns

* make part cols unambiguous

* cargo fixes

* fix for nested columns

---------

Co-authored-by: Andrew Lamb <[email protected]>
  • Loading branch information
devinjdangelo and alamb authored Oct 20, 2023
1 parent 113a7bd commit e17ca27
Show file tree
Hide file tree
Showing 12 changed files with 1,305 additions and 859 deletions.
5 changes: 0 additions & 5 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,11 +261,6 @@ config_namespace! {
/// number of rows written is not roughly divisible by the soft max
pub soft_max_rows_per_output_file: usize, default = 50000000

/// This is the maximum number of output files being written
/// in parallel. Higher values can potentially give faster write
/// performance at the cost of higher peak memory consumption.
pub max_parallel_ouput_files: usize, default = 8

/// This is the maximum number of RecordBatches buffered
/// for each output file being worked. Higher values can potentially
/// give faster write performance at the cost of higher peak
Expand Down
25 changes: 15 additions & 10 deletions datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,18 @@ use std::fmt;
use std::fmt::Debug;
use std::sync::Arc;

use super::write::{stateless_append_all, stateless_multipart_put};
use arrow_array::RecordBatch;
use datafusion_common::{exec_err, not_impl_err, DataFusionError, FileType};
use datafusion_execution::TaskContext;
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement};

use bytes::{Buf, Bytes};
use datafusion_physical_plan::metrics::MetricsSet;
use futures::stream::BoxStream;
use futures::{pin_mut, Stream, StreamExt, TryStreamExt};
use object_store::{delimited::newline_delimited_stream, ObjectMeta, ObjectStore};

use super::write::orchestration::{stateless_append_all, stateless_multipart_put};
use super::{FileFormat, DEFAULT_SCHEMA_INFER_MAX_RECORD};
use crate::datasource::file_format::file_compression_type::FileCompressionType;
use crate::datasource::file_format::write::{BatchSerializer, FileWriterMode};
Expand All @@ -39,17 +50,8 @@ use crate::physical_plan::{ExecutionPlan, SendableRecordBatchStream};
use arrow::csv::WriterBuilder;
use arrow::datatypes::{DataType, Field, Fields, Schema};
use arrow::{self, datatypes::SchemaRef};
use arrow_array::RecordBatch;
use datafusion_common::{exec_err, not_impl_err, DataFusionError, FileType};
use datafusion_execution::TaskContext;
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement};
use datafusion_physical_plan::metrics::MetricsSet;

use async_trait::async_trait;
use bytes::{Buf, Bytes};
use futures::stream::BoxStream;
use futures::{pin_mut, Stream, StreamExt, TryStreamExt};
use object_store::{delimited::newline_delimited_stream, ObjectMeta, ObjectStore};

/// Character Separated Value `FileFormat` implementation.
#[derive(Debug)]
Expand Down Expand Up @@ -485,6 +487,9 @@ impl CsvSink {
data: SendableRecordBatchStream,
context: &Arc<TaskContext>,
) -> Result<u64> {
if !self.config.table_partition_cols.is_empty() {
return Err(DataFusionError::NotImplemented("Inserting in append mode to hive style partitioned tables is not supported".into()));
}
let writer_options = self.config.file_type_writer_options.try_into_csv()?;
let (builder, compression) =
(&writer_options.writer_options, &writer_options.compression);
Expand Down
43 changes: 28 additions & 15 deletions datafusion/core/src/datasource/file_format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,32 +23,41 @@ use std::fmt::Debug;
use std::io::BufReader;
use std::sync::Arc;

use super::write::{stateless_append_all, stateless_multipart_put};
use super::{FileFormat, FileScanConfig};
use arrow::datatypes::Schema;
use arrow::datatypes::SchemaRef;
use arrow::json;
use arrow::json::reader::infer_json_schema_from_iterator;
use arrow::json::reader::ValueIter;
use arrow_array::RecordBatch;
use async_trait::async_trait;
use bytes::Buf;

use bytes::Bytes;
use datafusion_physical_expr::PhysicalExpr;
use datafusion_physical_expr::PhysicalSortRequirement;
use datafusion_physical_plan::ExecutionPlan;
use object_store::{GetResultPayload, ObjectMeta, ObjectStore};

use crate::datasource::physical_plan::FileGroupDisplay;
use crate::physical_plan::insert::DataSink;
use crate::physical_plan::insert::FileSinkExec;
use crate::physical_plan::SendableRecordBatchStream;
use crate::physical_plan::{DisplayAs, DisplayFormatType, Statistics};

use super::write::orchestration::{stateless_append_all, stateless_multipart_put};

use crate::datasource::file_format::file_compression_type::FileCompressionType;
use crate::datasource::file_format::write::{BatchSerializer, FileWriterMode};
use crate::datasource::file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD;
use crate::datasource::physical_plan::{FileGroupDisplay, FileSinkConfig, NdJsonExec};
use crate::datasource::physical_plan::{FileSinkConfig, NdJsonExec};
use crate::error::Result;
use crate::execution::context::SessionState;
use crate::physical_plan::insert::{DataSink, FileSinkExec};
use crate::physical_plan::{
DisplayAs, DisplayFormatType, ExecutionPlan, SendableRecordBatchStream, Statistics,
};

use arrow::datatypes::{Schema, SchemaRef};
use arrow::json;
use arrow::json::reader::{infer_json_schema_from_iterator, ValueIter};
use arrow_array::RecordBatch;
use datafusion_common::{not_impl_err, DataFusionError, FileType};
use datafusion_execution::TaskContext;
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement};
use datafusion_physical_plan::metrics::MetricsSet;

use async_trait::async_trait;
use bytes::{Buf, Bytes};
use object_store::{GetResultPayload, ObjectMeta, ObjectStore};

/// New line delimited JSON `FileFormat` implementation.
#[derive(Debug)]
pub struct JsonFormat {
Expand Down Expand Up @@ -258,6 +267,10 @@ impl JsonSink {
data: SendableRecordBatchStream,
context: &Arc<TaskContext>,
) -> Result<u64> {
if !self.config.table_partition_cols.is_empty() {
return Err(DataFusionError::NotImplemented("Inserting in append mode to hive style partitioned tables is not supported".into()));
}

let writer_options = self.config.file_type_writer_options.try_into_json()?;
let compression = &writer_options.compression;

Expand Down
47 changes: 41 additions & 6 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,15 @@ use std::fmt::Debug;
use std::io::Write;
use std::sync::Arc;

use super::write::{create_writer, start_demuxer_task, AbortableWrite, FileWriterMode};
use super::write::demux::start_demuxer_task;
use super::write::{create_writer, AbortableWrite, FileWriterMode};
use super::{FileFormat, FileScanConfig};

use crate::arrow::array::{
BooleanArray, Float32Array, Float64Array, Int32Array, Int64Array,
};
use crate::arrow::datatypes::DataType;
use crate::config::ConfigOptions;

use crate::datasource::file_format::file_compression_type::FileCompressionType;
use crate::datasource::get_col_stats;
use crate::datasource::physical_plan::{
Expand All @@ -42,8 +47,7 @@ use crate::physical_plan::{
Statistics,
};

use arrow::array::{BooleanArray, Float32Array, Float64Array, Int32Array, Int64Array};
use arrow::datatypes::{DataType, Fields, Schema, SchemaRef};
use arrow::datatypes::{Fields, Schema, SchemaRef};
use datafusion_common::stats::Precision;
use datafusion_common::{exec_err, not_impl_err, plan_err, DataFusionError, FileType};
use datafusion_execution::TaskContext;
Expand Down Expand Up @@ -604,6 +608,31 @@ impl ParquetSink {
Self { config }
}

/// Converts table schema to writer schema, which may differ in the case
/// of hive style partitioning where some columns are removed from the
/// underlying files.
fn get_writer_schema(&self) -> Arc<Schema> {
if !self.config.table_partition_cols.is_empty() {
let schema = self.config.output_schema();
let partition_names: Vec<_> = self
.config
.table_partition_cols
.iter()
.map(|(s, _)| s)
.collect();
Arc::new(Schema::new(
schema
.fields()
.iter()
.filter(|f| !partition_names.contains(&f.name()))
.map(|f| (**f).clone())
.collect::<Vec<_>>(),
))
} else {
self.config.output_schema().clone()
}
}

/// Creates an AsyncArrowWriter which serializes a parquet file to an ObjectStore
/// AsyncArrowWriters are used when individual parquet file serialization is not parallelized
async fn create_async_arrow_writer(
Expand Down Expand Up @@ -631,7 +660,7 @@ impl ParquetSink {
.map_err(DataFusionError::ObjectStore)?;
let writer = AsyncArrowWriter::try_new(
multipart_writer,
self.config.output_schema.clone(),
self.get_writer_schema(),
10485760,
Some(parquet_props),
)?;
Expand Down Expand Up @@ -721,10 +750,16 @@ impl DataSink for ParquetSink {
.map(|r| r as u64);
}

let part_col = if !self.config.table_partition_cols.is_empty() {
Some(self.config.table_partition_cols.clone())
} else {
None
};

let (demux_task, mut file_stream_rx) = start_demuxer_task(
data,
context,
None,
part_col,
self.config.table_paths[0].clone(),
"parquet".into(),
self.config.single_file_output,
Expand Down
Loading

0 comments on commit e17ca27

Please sign in to comment.