Skip to content

Commit

Permalink
Minor: name some constant values in arrow writer, parquet writer (#8642)
Browse files Browse the repository at this point in the history
* Minor: name some constant values in arrow writer

* Add constants to parquet.rs, update doc comments

* fix
  • Loading branch information
alamb authored Dec 27, 2023
1 parent bb99d2a commit 28ca6d1
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 10 deletions.
13 changes: 10 additions & 3 deletions datafusion/core/src/datasource/file_format/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

//! Apache Arrow format abstractions
//! [`ArrowFormat`]: Apache Arrow [`FileFormat`] abstractions
//!
//! Works with files following the [Arrow IPC format](https://arrow.apache.org/docs/format/Columnar.html#ipc-file-format)

Expand Down Expand Up @@ -58,6 +58,13 @@ use super::file_compression_type::FileCompressionType;
use super::write::demux::start_demuxer_task;
use super::write::{create_writer, SharedBuffer};

/// Initial writing buffer size. Note this is just a size hint for efficiency. It
/// will grow beyond the set value if needed.
const INITIAL_BUFFER_BYTES: usize = 1048576;

/// If the buffered Arrow data exceeds this size, it is flushed to object store
const BUFFER_FLUSH_BYTES: usize = 1024000;

/// Arrow `FileFormat` implementation.
#[derive(Default, Debug)]
pub struct ArrowFormat;
Expand Down Expand Up @@ -239,7 +246,7 @@ impl DataSink for ArrowFileSink {
IpcWriteOptions::try_new(64, false, arrow_ipc::MetadataVersion::V5)?
.try_with_compression(Some(CompressionType::LZ4_FRAME))?;
while let Some((path, mut rx)) = file_stream_rx.recv().await {
let shared_buffer = SharedBuffer::new(1048576);
let shared_buffer = SharedBuffer::new(INITIAL_BUFFER_BYTES);
let mut arrow_writer = arrow_ipc::writer::FileWriter::try_new_with_options(
shared_buffer.clone(),
&self.get_writer_schema(),
Expand All @@ -257,7 +264,7 @@ impl DataSink for ArrowFileSink {
row_count += batch.num_rows();
arrow_writer.write(&batch)?;
let mut buff_to_flush = shared_buffer.buffer.try_lock().unwrap();
if buff_to_flush.len() > 1024000 {
if buff_to_flush.len() > BUFFER_FLUSH_BYTES {
object_store_writer
.write_all(buff_to_flush.as_slice())
.await?;
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/file_format/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

//! Apache Avro format abstractions
//! [`AvroFormat`] Apache Avro [`FileFormat`] abstractions

use std::any::Any;
use std::sync::Arc;
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

//! CSV format abstractions
//! [`CsvFormat`], Comma Separated Value (CSV) [`FileFormat`] abstractions

use std::any::Any;
use std::collections::HashSet;
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/file_format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

//! Line delimited JSON format abstractions
//! [`JsonFormat`]: Line delimited JSON [`FileFormat`] abstractions

use std::any::Any;
use std::fmt;
Expand Down
19 changes: 15 additions & 4 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

//! Parquet format abstractions
//! [`ParquetFormat`]: Parquet [`FileFormat`] abstractions

use arrow_array::RecordBatch;
use async_trait::async_trait;
Expand Down Expand Up @@ -75,6 +75,17 @@ use crate::physical_plan::{
Statistics,
};

/// Size of the buffer for [`AsyncArrowWriter`].
const PARQUET_WRITER_BUFFER_SIZE: usize = 10485760;

/// Initial writing buffer size. Note this is just a size hint for efficiency. It
/// will grow beyond the set value if needed.
const INITIAL_BUFFER_BYTES: usize = 1048576;

/// When writing parquet files in parallel, if the buffered Parquet data exceeds
/// this size, it is flushed to object store
const BUFFER_FLUSH_BYTES: usize = 1024000;

/// The Apache Parquet `FileFormat` implementation
///
/// Note it is recommended these are instead configured on the [`ConfigOptions`]
Expand Down Expand Up @@ -680,7 +691,7 @@ impl ParquetSink {
let writer = AsyncArrowWriter::try_new(
multipart_writer,
self.get_writer_schema(),
10485760,
PARQUET_WRITER_BUFFER_SIZE,
Some(parquet_props),
)?;
Ok(writer)
Expand Down Expand Up @@ -1004,7 +1015,7 @@ async fn concatenate_parallel_row_groups(
writer_props: Arc<WriterProperties>,
mut object_store_writer: AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>,
) -> Result<usize> {
let merged_buff = SharedBuffer::new(1048576);
let merged_buff = SharedBuffer::new(INITIAL_BUFFER_BYTES);

let schema_desc = arrow_to_parquet_schema(schema.as_ref())?;
let mut parquet_writer = SerializedFileWriter::new(
Expand All @@ -1025,7 +1036,7 @@ async fn concatenate_parallel_row_groups(
for chunk in serialized_columns {
chunk.append_to_row_group(&mut rg_out)?;
let mut buff_to_flush = merged_buff.buffer.try_lock().unwrap();
if buff_to_flush.len() > 1024000 {
if buff_to_flush.len() > BUFFER_FLUSH_BYTES {
object_store_writer
.write_all(buff_to_flush.as_slice())
.await?;
Expand Down

0 comments on commit 28ca6d1

Please sign in to comment.