Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(rust): expose WriterProperties method on RecordBatchWriter and DeltaWriter #1497

19 changes: 18 additions & 1 deletion rust/examples/basic_operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ use arrow::{
};
use deltalake::operations::collect_sendable_stream;
use deltalake::{action::SaveMode, DeltaOps, SchemaDataType, SchemaField};
use parquet::{
basic::{Compression, ZstdLevel},
file::properties::WriterProperties,
};

use std::sync::Arc;

fn get_table_columns() -> Vec<SchemaField> {
Expand Down Expand Up @@ -55,15 +60,27 @@ async fn main() -> Result<(), deltalake::errors::DeltaTableError> {

assert_eq!(table.version(), 0);

let writer_properties = WriterProperties::builder()
.set_compression(Compression::ZSTD(ZstdLevel::try_new(3).unwrap()))
.build();

let batch = get_table_batches();
let table = DeltaOps(table).write(vec![batch.clone()]).await?;
let table = DeltaOps(table)
.write(vec![batch.clone()])
.with_writer_properties(writer_properties)
.await?;

assert_eq!(table.version(), 1);

let writer_properties = WriterProperties::builder()
.set_compression(Compression::ZSTD(ZstdLevel::try_new(3).unwrap()))
.build();

// To overwrite instead of append (which is the default), use `.with_save_mode`:
let table = DeltaOps(table)
.write(vec![batch.clone()])
.with_save_mode(SaveMode::Overwrite)
.with_writer_properties(writer_properties)
.await?;

assert_eq!(table.version(), 2);
Expand Down
15 changes: 12 additions & 3 deletions rust/examples/recordbatch-writer.rs
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for adding this to the example :)

Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,12 @@ use deltalake::errors::DeltaTableError;
use deltalake::writer::{DeltaWriter, RecordBatchWriter};
use deltalake::*;
use log::*;

use object_store::path::Path;
use parquet::{
basic::{Compression, ZstdLevel},
file::properties::WriterProperties,
};

use std::collections::HashMap;
use std::sync::Arc;

Expand All @@ -42,8 +46,13 @@ async fn main() -> Result<(), anyhow::Error> {
Err(err) => Err(err).unwrap(),
};

let mut writer =
RecordBatchWriter::for_table(&table).expect("Failed to make RecordBatchWriter");
let writer_properties = WriterProperties::builder()
.set_compression(Compression::ZSTD(ZstdLevel::try_new(3).unwrap()))
.build();

let mut writer = RecordBatchWriter::for_table(&table)
.expect("Failed to make RecordBatchWriter")
.with_writer_properties(writer_properties);

let records = fetch_readings();
let batch = convert_to_batch(&table, &records);
Expand Down
11 changes: 10 additions & 1 deletion rust/src/operations/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ pub struct WriteBuilder {
batches: Option<Vec<RecordBatch>>,
/// how to handle cast failures, either return NULL (safe=true) or return ERR (safe=false)
safe_cast: bool,
/// Parquet writer properties
writer_properties: Option<WriterProperties>,
}

impl WriteBuilder {
Expand All @@ -116,6 +118,7 @@ impl WriteBuilder {
write_batch_size: None,
batches: None,
safe_cast: false,
writer_properties: None,
}
}

Expand Down Expand Up @@ -178,6 +181,12 @@ impl WriteBuilder {
self
}

/// Specify the writer properties to use when writing a parquet file
pub fn with_writer_properties(mut self, writer_properties: WriterProperties) -> Self {
self.writer_properties = Some(writer_properties);
self
}

async fn check_preconditions(&self) -> DeltaResult<Vec<Action>> {
match self.store.is_delta_table_location().await? {
true => {
Expand Down Expand Up @@ -390,7 +399,7 @@ impl std::future::IntoFuture for WriteBuilder {
this.store.clone(),
this.target_file_size,
this.write_batch_size,
None,
this.writer_properties,
this.safe_cast,
)
.await?;
Expand Down
6 changes: 6 additions & 0 deletions rust/src/operations/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,12 @@ impl DeltaWriter {
}
}

/// Apply custom writer_properties to the underlying parquet writer
pub fn with_writer_properties(mut self, writer_properties: WriterProperties) -> Self {
self.config.writer_properties = writer_properties;
self
}

fn divide_by_partition_values(
&mut self,
values: &RecordBatch,
Expand Down
6 changes: 6 additions & 0 deletions rust/src/writer/record_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,12 @@ impl RecordBatchWriter {
Ok(())
}

/// Sets the writer properties for the underlying arrow writer.
pub fn with_writer_properties(mut self, writer_properties: WriterProperties) -> Self {
self.writer_properties = writer_properties;
self
}

fn divide_by_partition_values(
&mut self,
values: &RecordBatch,
Expand Down