Skip to content

Commit

Permalink
update writers to include compression algo
Browse files Browse the repository at this point in the history
  • Loading branch information
Blajda committed Jun 3, 2023
1 parent b17f286 commit daea2a0
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 30 deletions.
14 changes: 8 additions & 6 deletions rust/src/operations/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::storage::ObjectStoreRef;
use crate::writer::record_batch::{divide_by_partition_values, PartitionResult};
use crate::writer::stats::create_add;
use crate::writer::utils::{
arrow_schema_without_partitions, record_batch_without_partitions, PartitionPath,
self, arrow_schema_without_partitions, record_batch_without_partitions, PartitionPath,
ShareableBuffer,
};
use crate::{crate_version, DeltaResult, DeltaTableError};
Expand Down Expand Up @@ -296,12 +296,14 @@ impl PartitionWriter {
}

fn next_data_path(&mut self) -> Path {
let part = format!("{:0>5}", self.part_counter);
self.part_counter += 1;
// TODO: what does c000 mean?
// TODO handle file name for different compressions
let file_name = format!("part-{}-{}-c000.snappy.parquet", part, self.writer_id);
self.config.prefix.child(file_name)

utils::next_data_path(
&self.config.prefix,
self.part_counter,
&self.writer_id,
&self.config.writer_properties,
)
}

fn reset_writer(&mut self) -> DeltaResult<(ArrowWriter<ShareableBuffer>, ShareableBuffer)> {
Expand Down
11 changes: 9 additions & 2 deletions rust/src/writer/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::sync::Arc;
use super::stats::create_add;
use super::utils::{
arrow_schema_without_partitions, next_data_path, record_batch_from_message,
record_batch_without_partitions, stringified_partition_value,
record_batch_without_partitions, stringified_partition_value, PartitionPath,
};
use super::{DeltaWriter, DeltaWriterError};
use crate::builder::DeltaTableBuilder;
Expand All @@ -17,12 +17,14 @@ use arrow::datatypes::{Schema as ArrowSchema, SchemaRef as ArrowSchemaRef};
use arrow::record_batch::*;
use bytes::Bytes;
use log::{info, warn};
use object_store::path::Path;
use object_store::ObjectStore;
use parquet::{
arrow::ArrowWriter, basic::Compression, errors::ParquetError,
file::properties::WriterProperties,
};
use serde_json::Value;
use uuid::Uuid;

type BadValue = (Value, ParquetError);

Expand Down Expand Up @@ -361,7 +363,12 @@ impl DeltaWriter<Vec<Value>> for JsonWriter {

for (_, writer) in writers {
let metadata = writer.arrow_writer.close()?;
let path = next_data_path(&self.partition_columns, &writer.partition_values, None)?;
let prefix =
PartitionPath::from_hashmap(&self.partition_columns, &writer.partition_values)?;
let prefix = Path::parse(prefix)?;
let uuid = Uuid::new_v4();

let path = next_data_path(&prefix, 0, &uuid, &writer.writer_properties);
let obj_bytes = Bytes::from(writer.buffer.to_vec());
let file_size = obj_bytes.len() as i64;
self.storage.put(&path, obj_bytes).await?;
Expand Down
9 changes: 7 additions & 2 deletions rust/src/writer/record_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,10 @@ use arrow::record_batch::RecordBatch;
use arrow_array::ArrayRef;
use arrow_row::{RowConverter, SortField};
use bytes::Bytes;
use object_store::ObjectStore;
use object_store::{path::Path, ObjectStore};
use parquet::{arrow::ArrowWriter, errors::ParquetError};
use parquet::{basic::Compression, file::properties::WriterProperties};
use uuid::Uuid;

/// Writes messages to a delta lake table.
pub struct RecordBatchWriter {
Expand Down Expand Up @@ -228,7 +229,11 @@ impl DeltaWriter<RecordBatch> for RecordBatchWriter {

for (_, writer) in writers {
let metadata = writer.arrow_writer.close()?;
let path = next_data_path(&self.partition_columns, &writer.partition_values, None)?;
let prefix =
PartitionPath::from_hashmap(&self.partition_columns, &writer.partition_values)?;
let prefix = Path::parse(prefix)?;
let uuid = Uuid::new_v4();
let path = next_data_path(&prefix, 0, &uuid, &writer.writer_properties);
let obj_bytes = Bytes::from(writer.buffer.to_vec());
let file_size = obj_bytes.len() as i64;
self.storage.put(&path, obj_bytes).await?;
Expand Down
125 changes: 105 additions & 20 deletions rust/src/writer/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ use arrow::json::ReaderBuilder;
use arrow::record_batch::*;
use object_store::path::Path;
use parking_lot::RwLock;
use parquet::basic::Compression;
use parquet::file::properties::WriterProperties;
use parquet::schema::types::ColumnPath;
use serde_json::Value;
use uuid::Uuid;

Expand Down Expand Up @@ -78,30 +81,45 @@ impl Display for PartitionPath {
// TODO: parquet files have a 5 digit zero-padded prefix and a "c\d{3}" suffix that
// I have not been able to find documentation for yet.
pub(crate) fn next_data_path(
partition_columns: &[String],
partition_values: &HashMap<String, Option<String>>,
part: Option<i32>,
) -> Result<Path, DeltaWriterError> {
// TODO: what does 00000 mean?
// TODO (roeap): my understanding is, that the values are used as a counter - i.e. if a single batch of
// data written to one partition needs to be split due to desired file size constraints.
let first_part = match part {
Some(count) => format!("{count:0>5}"),
_ => "00000".to_string(),
};
let uuid_part = Uuid::new_v4();
// TODO: what does c000 mean?
let last_part = "c000";
prefix: &Path,
part_count: usize,
writer_id: &Uuid,
writer_properties: &WriterProperties,
) -> Path {
fn compression_to_str(compression: &Compression) -> &str {
match compression {
Compression::UNCOMPRESSED => "",
Compression::SNAPPY => "snappy",
Compression::GZIP(_) => "gz",
Compression::LZO => "lzo",
Compression::BROTLI(_) => "brotli",
Compression::LZ4 => "lz4",
Compression::ZSTD(_) => "zstd",
Compression::LZ4_RAW => "lz4_raw",
}
}

// We can not access the default column properties but the current implementation will return
// the default compression when the column is not found
let column_path = ColumnPath::new(Vec::new());
let compression = writer_properties.compression(&column_path);

let part = format!("{:0>5}", part_count);

// NOTE: If we add a non-snappy option, file name must change
let file_name = format!("part-{first_part}-{uuid_part}-{last_part}.snappy.parquet");
// TODO: what does c000 mean?
let mut file_name = format!("part-{}-{}-c000", part, writer_id);

if partition_columns.is_empty() {
return Ok(Path::from(file_name));
match compression {
Compression::UNCOMPRESSED => {}
_ => {
file_name.push('.');
file_name.push_str(compression_to_str(&compression));
}
}

let partition_key = PartitionPath::from_hashmap(partition_columns, partition_values)?;
Ok(Path::from(format!("{partition_key}/{file_name}")))
file_name.push_str(".parquet");

prefix.child(file_name)
}

/// Convert a vector of json values to a RecordBatch
Expand Down Expand Up @@ -291,6 +309,7 @@ mod tests {
TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray, UInt16Array,
UInt32Array, UInt64Array, UInt8Array,
};
use parquet::basic::{BrotliLevel, GzipLevel, ZstdLevel};

#[test]
fn test_stringified_partition_value() {
Expand Down Expand Up @@ -347,4 +366,70 @@ mod tests {
)
}
}

#[test]
fn test_data_path() {
let prefix = Path::parse("x=0/y=0").unwrap();
let uuid = Uuid::parse_str("02f09a3f-1624-3b1d-8409-44eff7708208").unwrap();

// Validated against Spark

let props = WriterProperties::builder()
.set_compression(Compression::UNCOMPRESSED)
.build();

assert_eq!(
next_data_path(&prefix, 1, &uuid, &props).as_ref(),
"x=0/y=0/part-00001-02f09a3f-1624-3b1d-8409-44eff7708208-c000.parquet"
);

let props = WriterProperties::builder()
.set_compression(Compression::SNAPPY)
.build();
assert_eq!(
next_data_path(&prefix, 1, &uuid, &props).as_ref(),
"x=0/y=0/part-00001-02f09a3f-1624-3b1d-8409-44eff7708208-c000.snappy.parquet"
);

let props = WriterProperties::builder()
.set_compression(Compression::GZIP(GzipLevel::default()))
.build();
assert_eq!(
next_data_path(&prefix, 1, &uuid, &props).as_ref(),
"x=0/y=0/part-00001-02f09a3f-1624-3b1d-8409-44eff7708208-c000.gz.parquet"
);

let props = WriterProperties::builder()
.set_compression(Compression::LZ4)
.build();
assert_eq!(
next_data_path(&prefix, 1, &uuid, &props).as_ref(),
"x=0/y=0/part-00001-02f09a3f-1624-3b1d-8409-44eff7708208-c000.lz4.parquet"
);

let props = WriterProperties::builder()
.set_compression(Compression::ZSTD(ZstdLevel::default()))
.build();
assert_eq!(
next_data_path(&prefix, 1, &uuid, &props).as_ref(),
"x=0/y=0/part-00001-02f09a3f-1624-3b1d-8409-44eff7708208-c000.zstd.parquet"
);

// Unable to validate against spark
let props = WriterProperties::builder()
.set_compression(Compression::LZ4_RAW)
.build();
assert_eq!(
next_data_path(&prefix, 1, &uuid, &props).as_ref(),
"x=0/y=0/part-00001-02f09a3f-1624-3b1d-8409-44eff7708208-c000.lz4_raw.parquet"
);

let props = WriterProperties::builder()
.set_compression(Compression::BROTLI(BrotliLevel::default()))
.build();
assert_eq!(
next_data_path(&prefix, 1, &uuid, &props).as_ref(),
"x=0/y=0/part-00001-02f09a3f-1624-3b1d-8409-44eff7708208-c000.brotli.parquet"
);
}
}

0 comments on commit daea2a0

Please sign in to comment.