diff --git a/datafusion/core/src/datasource/file_format/arrow.rs b/datafusion/core/src/datasource/file_format/arrow.rs index 7d393d9129dd..650f8c844eda 100644 --- a/datafusion/core/src/datasource/file_format/arrow.rs +++ b/datafusion/core/src/datasource/file_format/arrow.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -//! Apache Arrow format abstractions +//! [`ArrowFormat`]: Apache Arrow [`FileFormat`] abstractions //! //! Works with files following the [Arrow IPC format](https://arrow.apache.org/docs/format/Columnar.html#ipc-file-format) @@ -58,6 +58,13 @@ use super::file_compression_type::FileCompressionType; use super::write::demux::start_demuxer_task; use super::write::{create_writer, SharedBuffer}; +/// Initial writing buffer size. Note this is just a size hint for efficiency. It +/// will grow beyond the set value if needed. +const INITIAL_BUFFER_BYTES: usize = 1048576; + +/// If the buffered Arrow data exceeds this size, it is flushed to object store +const BUFFER_FLUSH_BYTES: usize = 1024000; + /// Arrow `FileFormat` implementation. #[derive(Default, Debug)] pub struct ArrowFormat; @@ -239,7 +246,7 @@ impl DataSink for ArrowFileSink { IpcWriteOptions::try_new(64, false, arrow_ipc::MetadataVersion::V5)? .try_with_compression(Some(CompressionType::LZ4_FRAME))?; while let Some((path, mut rx)) = file_stream_rx.recv().await { - let shared_buffer = SharedBuffer::new(1048576); + let shared_buffer = SharedBuffer::new(INITIAL_BUFFER_BYTES); let mut arrow_writer = arrow_ipc::writer::FileWriter::try_new_with_options( shared_buffer.clone(), &self.get_writer_schema(), @@ -257,7 +264,7 @@ impl DataSink for ArrowFileSink { row_count += batch.num_rows(); arrow_writer.write(&batch)?; let mut buff_to_flush = shared_buffer.buffer.try_lock().unwrap(); - if buff_to_flush.len() > 1024000 { + if buff_to_flush.len() > BUFFER_FLUSH_BYTES { object_store_writer .write_all(buff_to_flush.as_slice()) .await?; diff --git a/datafusion/core/src/datasource/file_format/avro.rs b/datafusion/core/src/datasource/file_format/avro.rs index a24a28ad6fdd..6d424bf0b28f 100644 --- a/datafusion/core/src/datasource/file_format/avro.rs +++ b/datafusion/core/src/datasource/file_format/avro.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -//! Apache Avro format abstractions +//! [`AvroFormat`] Apache Avro [`FileFormat`] abstractions use std::any::Any; use std::sync::Arc; diff --git a/datafusion/core/src/datasource/file_format/csv.rs b/datafusion/core/src/datasource/file_format/csv.rs index df6689af6b73..4033bcd3b557 100644 --- a/datafusion/core/src/datasource/file_format/csv.rs +++ b/datafusion/core/src/datasource/file_format/csv.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -//! CSV format abstractions +//! [`CsvFormat`], Comma Separated Value (CSV) [`FileFormat`] abstractions use std::any::Any; use std::collections::HashSet; diff --git a/datafusion/core/src/datasource/file_format/json.rs b/datafusion/core/src/datasource/file_format/json.rs index 9893a1db45de..fcb1d5f8e527 100644 --- a/datafusion/core/src/datasource/file_format/json.rs +++ b/datafusion/core/src/datasource/file_format/json.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -//! Line delimited JSON format abstractions +//! [`JsonFormat`]: Line delimited JSON [`FileFormat`] abstractions use std::any::Any; use std::fmt; diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 0c813b6ccbf0..7044acccd6dc 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -//! Parquet format abstractions +//! [`ParquetFormat`]: Parquet [`FileFormat`] abstractions use arrow_array::RecordBatch; use async_trait::async_trait; @@ -75,6 +75,17 @@ use crate::physical_plan::{ Statistics, }; +/// Size of the buffer for [`AsyncArrowWriter`]. +const PARQUET_WRITER_BUFFER_SIZE: usize = 10485760; + +/// Initial writing buffer size. Note this is just a size hint for efficiency. It +/// will grow beyond the set value if needed. +const INITIAL_BUFFER_BYTES: usize = 1048576; + +/// When writing parquet files in parallel, if the buffered Parquet data exceeds +/// this size, it is flushed to object store +const BUFFER_FLUSH_BYTES: usize = 1024000; + /// The Apache Parquet `FileFormat` implementation /// /// Note it is recommended these are instead configured on the [`ConfigOptions`] @@ -680,7 +691,7 @@ impl ParquetSink { let writer = AsyncArrowWriter::try_new( multipart_writer, self.get_writer_schema(), - 10485760, + PARQUET_WRITER_BUFFER_SIZE, Some(parquet_props), )?; Ok(writer) @@ -1004,7 +1015,7 @@ async fn concatenate_parallel_row_groups( writer_props: Arc, mut object_store_writer: AbortableWrite>, ) -> Result { - let merged_buff = SharedBuffer::new(1048576); + let merged_buff = SharedBuffer::new(INITIAL_BUFFER_BYTES); let schema_desc = arrow_to_parquet_schema(schema.as_ref())?; let mut parquet_writer = SerializedFileWriter::new( @@ -1025,7 +1036,7 @@ async fn concatenate_parallel_row_groups( for chunk in serialized_columns { chunk.append_to_row_group(&mut rg_out)?; let mut buff_to_flush = merged_buff.buffer.try_lock().unwrap(); - if buff_to_flush.len() > 1024000 { + if buff_to_flush.len() > BUFFER_FLUSH_BYTES { object_store_writer .write_all(buff_to_flush.as_slice()) .await?;