Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Minor: name some constant values in arrow writer, parquet writer #8642

Merged
merged 3 commits into from
Dec 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions datafusion/core/src/datasource/file_format/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

//! Apache Arrow format abstractions
//! [`ArrowFormat`]: Apache Arrow [`FileFormat`] abstractions
//!
//! Works with files following the [Arrow IPC format](https://arrow.apache.org/docs/format/Columnar.html#ipc-file-format)

Expand Down Expand Up @@ -58,6 +58,13 @@ use super::file_compression_type::FileCompressionType;
use super::write::demux::start_demuxer_task;
use super::write::{create_writer, SharedBuffer};

/// Initial writing buffer size. Note this is just a size hint for efficiency. It
/// will grow beyond the set value if needed.
const INITIAL_BUFFER_BYTES: usize = 1048576;

/// If the buffered Arrow data exceeds this size, it is flushed to object store
const BUFFER_FLUSH_BYTES: usize = 1024000;

/// Arrow `FileFormat` implementation.
#[derive(Default, Debug)]
pub struct ArrowFormat;
Expand Down Expand Up @@ -239,7 +246,7 @@ impl DataSink for ArrowFileSink {
IpcWriteOptions::try_new(64, false, arrow_ipc::MetadataVersion::V5)?
.try_with_compression(Some(CompressionType::LZ4_FRAME))?;
while let Some((path, mut rx)) = file_stream_rx.recv().await {
let shared_buffer = SharedBuffer::new(1048576);
let shared_buffer = SharedBuffer::new(INITIAL_BUFFER_BYTES);
let mut arrow_writer = arrow_ipc::writer::FileWriter::try_new_with_options(
shared_buffer.clone(),
&self.get_writer_schema(),
Expand All @@ -257,7 +264,7 @@ impl DataSink for ArrowFileSink {
row_count += batch.num_rows();
arrow_writer.write(&batch)?;
let mut buff_to_flush = shared_buffer.buffer.try_lock().unwrap();
if buff_to_flush.len() > 1024000 {
if buff_to_flush.len() > BUFFER_FLUSH_BYTES {
object_store_writer
.write_all(buff_to_flush.as_slice())
.await?;
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/file_format/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

//! Apache Avro format abstractions
//! [`AvroFormat`] Apache Avro [`FileFormat`] abstractions

use std::any::Any;
use std::sync::Arc;
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

//! CSV format abstractions
//! [`CsvFormat`], Comma Separated Value (CSV) [`FileFormat`] abstractions

use std::any::Any;
use std::collections::HashSet;
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/file_format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

//! Line delimited JSON format abstractions
//! [`JsonFormat`]: Line delimited JSON [`FileFormat`] abstractions

use std::any::Any;
use std::fmt;
Expand Down
19 changes: 15 additions & 4 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

//! Parquet format abstractions
//! [`ParquetFormat`]: Parquet [`FileFormat`] abstractions

use arrow_array::RecordBatch;
use async_trait::async_trait;
Expand Down Expand Up @@ -75,6 +75,17 @@ use crate::physical_plan::{
Statistics,
};

/// Size of the buffer for [`AsyncArrowWriter`].
const PARQUET_WRITER_BUFFER_SIZE: usize = 10485760;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know why these values are (slightly) different, but I figured we could start by keeping them the same and then could unify them as a follow on if needed


/// Initial writing buffer size. Note this is just a size hint for efficiency. It
/// will grow beyond the set value if needed.
const INITIAL_BUFFER_BYTES: usize = 1048576;

/// When writing parquet files in parallel, if the buffered Parquet data exceeds
/// this size, it is flushed to object store
const BUFFER_FLUSH_BYTES: usize = 1024000;

/// The Apache Parquet `FileFormat` implementation
///
/// Note it is recommended these are instead configured on the [`ConfigOptions`]
Expand Down Expand Up @@ -680,7 +691,7 @@ impl ParquetSink {
let writer = AsyncArrowWriter::try_new(
multipart_writer,
self.get_writer_schema(),
10485760,
PARQUET_WRITER_BUFFER_SIZE,
Some(parquet_props),
)?;
Ok(writer)
Expand Down Expand Up @@ -1004,7 +1015,7 @@ async fn concatenate_parallel_row_groups(
writer_props: Arc<WriterProperties>,
mut object_store_writer: AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>,
) -> Result<usize> {
let merged_buff = SharedBuffer::new(1048576);
let merged_buff = SharedBuffer::new(INITIAL_BUFFER_BYTES);

let schema_desc = arrow_to_parquet_schema(schema.as_ref())?;
let mut parquet_writer = SerializedFileWriter::new(
Expand All @@ -1025,7 +1036,7 @@ async fn concatenate_parallel_row_groups(
for chunk in serialized_columns {
chunk.append_to_row_group(&mut rg_out)?;
let mut buff_to_flush = merged_buff.buffer.try_lock().unwrap();
if buff_to_flush.len() > 1024000 {
if buff_to_flush.len() > BUFFER_FLUSH_BYTES {
object_store_writer
.write_all(buff_to_flush.as_slice())
.await?;
Expand Down