Skip to content

Commit

Permalink
feat: update writers to include compression method in file name (#1431)
Browse files Browse the repository at this point in the history
# Description
The compression name was hard-coded to include snappy however users can
now specify their own methods which will cause a disconnect in the the
name and the method used.

The naming convention is used as a hint by spark and hive to create the
correct reader without reading having read the header of the file.

---------

Co-authored-by: Will Jones <[email protected]>
  • Loading branch information
Blajda and wjones127 authored Jun 7, 2023
1 parent 2db7f9b commit 197db59
Show file tree
Hide file tree
Showing 4 changed files with 146 additions and 53 deletions.
33 changes: 17 additions & 16 deletions rust/src/operations/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,16 @@

use std::collections::HashMap;

use crate::action::Add;
use crate::storage::ObjectStoreRef;
use crate::writer::record_batch::{divide_by_partition_values, PartitionResult};
use crate::writer::stats::create_add;
use crate::writer::utils::{
self, arrow_schema_without_partitions, record_batch_without_partitions, PartitionPath,
ShareableBuffer,
};
use crate::{crate_version, DeltaResult, DeltaTableError};

use arrow::datatypes::SchemaRef as ArrowSchemaRef;
use arrow::error::ArrowError;
use arrow::record_batch::RecordBatch;
Expand All @@ -11,17 +21,6 @@ use parquet::arrow::ArrowWriter;
use parquet::basic::Compression;
use parquet::file::properties::WriterProperties;

use crate::action::Add;
use crate::crate_version;
use crate::errors::{DeltaResult, DeltaTableError};
use crate::storage::ObjectStoreRef;
use crate::writer::record_batch::{divide_by_partition_values, PartitionResult};
use crate::writer::stats::create_add;
use crate::writer::utils::{
arrow_schema_without_partitions, record_batch_without_partitions, PartitionPath,
ShareableBuffer,
};

// TODO databricks often suggests a file size of 100mb, should we set this default?
const DEFAULT_TARGET_FILE_SIZE: usize = 104_857_600;
const DEFAULT_WRITE_BATCH_SIZE: usize = 1024;
Expand Down Expand Up @@ -297,12 +296,14 @@ impl PartitionWriter {
}

fn next_data_path(&mut self) -> Path {
let part = format!("{:0>5}", self.part_counter);
self.part_counter += 1;
// TODO: what does c000 mean?
// TODO handle file name for different compressions
let file_name = format!("part-{}-{}-c000.snappy.parquet", part, self.writer_id);
self.config.prefix.child(file_name)

utils::next_data_path(
&self.config.prefix,
self.part_counter,
&self.writer_id,
&self.config.writer_properties,
)
}

fn reset_writer(&mut self) -> DeltaResult<(ArrowWriter<ShareableBuffer>, ShareableBuffer)> {
Expand Down
30 changes: 18 additions & 12 deletions rust/src/writer/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,28 @@ use std::collections::HashMap;
use std::convert::TryFrom;
use std::sync::Arc;

use super::stats::create_add;
use super::utils::{
arrow_schema_without_partitions, next_data_path, record_batch_from_message,
record_batch_without_partitions, stringified_partition_value, PartitionPath,
};
use super::{DeltaWriter, DeltaWriterError};
use crate::builder::DeltaTableBuilder;
use crate::{action::Add, DeltaTable, DeltaTableError, DeltaTableMetaData, Schema};
use crate::{storage::DeltaObjectStore, writer::utils::ShareableBuffer};

use arrow::datatypes::{Schema as ArrowSchema, SchemaRef as ArrowSchemaRef};
use arrow::record_batch::*;
use bytes::Bytes;
use log::{info, warn};
use object_store::path::Path;
use object_store::ObjectStore;
use parquet::{
arrow::ArrowWriter, basic::Compression, errors::ParquetError,
file::properties::WriterProperties,
};
use serde_json::Value;

use super::stats::create_add;
use super::utils::{
arrow_schema_without_partitions, next_data_path, record_batch_from_message,
record_batch_without_partitions, stringified_partition_value,
};
use super::{DeltaWriter, DeltaWriterError};
use crate::builder::DeltaTableBuilder;
use crate::errors::DeltaTableError;
use crate::{action::Add, DeltaTable, DeltaTableMetaData, Schema};
use crate::{storage::DeltaObjectStore, writer::utils::ShareableBuffer};
use uuid::Uuid;

type BadValue = (Value, ParquetError);

Expand Down Expand Up @@ -362,7 +363,12 @@ impl DeltaWriter<Vec<Value>> for JsonWriter {

for (_, writer) in writers {
let metadata = writer.arrow_writer.close()?;
let path = next_data_path(&self.partition_columns, &writer.partition_values, None)?;
let prefix =
PartitionPath::from_hashmap(&self.partition_columns, &writer.partition_values)?;
let prefix = Path::parse(prefix)?;
let uuid = Uuid::new_v4();

let path = next_data_path(&prefix, 0, &uuid, &writer.writer_properties);
let obj_bytes = Bytes::from(writer.buffer.to_vec());
let file_size = obj_bytes.len() as i64;
self.storage.put(&path, obj_bytes).await?;
Expand Down
9 changes: 7 additions & 2 deletions rust/src/writer/record_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,10 @@ use arrow::record_batch::RecordBatch;
use arrow_array::ArrayRef;
use arrow_row::{RowConverter, SortField};
use bytes::Bytes;
use object_store::ObjectStore;
use object_store::{path::Path, ObjectStore};
use parquet::{arrow::ArrowWriter, errors::ParquetError};
use parquet::{basic::Compression, file::properties::WriterProperties};
use uuid::Uuid;

use super::stats::create_add;
use super::utils::{
Expand Down Expand Up @@ -226,7 +227,11 @@ impl DeltaWriter<RecordBatch> for RecordBatchWriter {

for (_, writer) in writers {
let metadata = writer.arrow_writer.close()?;
let path = next_data_path(&self.partition_columns, &writer.partition_values, None)?;
let prefix =
PartitionPath::from_hashmap(&self.partition_columns, &writer.partition_values)?;
let prefix = Path::parse(prefix)?;
let uuid = Uuid::new_v4();
let path = next_data_path(&prefix, 0, &uuid, &writer.writer_properties);
let obj_bytes = Bytes::from(writer.buffer.to_vec());
let file_size = obj_bytes.len() as i64;
self.storage.put(&path, obj_bytes).await?;
Expand Down
127 changes: 104 additions & 23 deletions rust/src/writer/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ use arrow::json::ReaderBuilder;
use arrow::record_batch::*;
use object_store::path::Path;
use parking_lot::RwLock;
use parquet::basic::Compression;
use parquet::file::properties::WriterProperties;
use parquet::schema::types::ColumnPath;
use serde_json::Value;
use uuid::Uuid;

Expand Down Expand Up @@ -75,33 +78,46 @@ impl Display for PartitionPath {
}
}

// TODO: parquet files have a 5 digit zero-padded prefix and a "c\d{3}" suffix that
// I have not been able to find documentation for yet.
/// Generate the name of the file to be written
/// prefix: The location of the file to be written
/// part_count: Used the indicate that single logical partition was split into multiple physical files
/// starts at 0. Is typically used when writer splits that data due to file size constraints
pub(crate) fn next_data_path(
partition_columns: &[String],
partition_values: &HashMap<String, Option<String>>,
part: Option<i32>,
) -> Result<Path, DeltaWriterError> {
// TODO: what does 00000 mean?
// TODO (roeap): my understanding is, that the values are used as a counter - i.e. if a single batch of
// data written to one partition needs to be split due to desired file size constraints.
let first_part = match part {
Some(count) => format!("{count:0>5}"),
_ => "00000".to_string(),
};
let uuid_part = Uuid::new_v4();
// TODO: what does c000 mean?
let last_part = "c000";
prefix: &Path,
part_count: usize,
writer_id: &Uuid,
writer_properties: &WriterProperties,
) -> Path {
fn compression_to_str(compression: &Compression) -> &str {
match compression {
// This is to match HADOOP's convention
// https://github.com/apache/parquet-mr/blob/c4977579ab3b149ea045a177b039f055b5408e8f/parquet-common/src/main/java/org/apache/parquet/hadoop/metadata/CompressionCodecName.java#L27-L34
Compression::UNCOMPRESSED => "",
Compression::SNAPPY => ".snappy",
Compression::GZIP(_) => ".gz",
Compression::LZO => ".lzo",
Compression::BROTLI(_) => ".br",
Compression::LZ4 => ".lz4",
Compression::ZSTD(_) => ".zstd",
Compression::LZ4_RAW => ".lz4raw",
}
}

// NOTE: If we add a non-snappy option, file name must change
let file_name = format!("part-{first_part}-{uuid_part}-{last_part}.snappy.parquet");
// We can not access the default column properties but the current implementation will return
// the default compression when the column is not found
let column_path = ColumnPath::new(Vec::new());
let compression = writer_properties.compression(&column_path);

if partition_columns.is_empty() {
return Ok(Path::from(file_name));
}
let part = format!("{:0>5}", part_count);

let partition_key = PartitionPath::from_hashmap(partition_columns, partition_values)?;
Ok(Path::from(format!("{partition_key}/{file_name}")))
// TODO: what does c000 mean?
let file_name = format!(
"part-{}-{}-c000{}.parquet",
part,
writer_id,
compression_to_str(&compression)
);
prefix.child(file_name)
}

/// Convert a vector of json values to a RecordBatch
Expand Down Expand Up @@ -291,6 +307,7 @@ mod tests {
TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray, UInt16Array,
UInt32Array, UInt64Array, UInt8Array,
};
use parquet::basic::{BrotliLevel, GzipLevel, ZstdLevel};

#[test]
fn test_stringified_partition_value() {
Expand Down Expand Up @@ -347,4 +364,68 @@ mod tests {
)
}
}

#[test]
fn test_data_path() {
let prefix = Path::parse("x=0/y=0").unwrap();
let uuid = Uuid::parse_str("02f09a3f-1624-3b1d-8409-44eff7708208").unwrap();

// Validated against Spark
let props = WriterProperties::builder()
.set_compression(Compression::UNCOMPRESSED)
.build();

assert_eq!(
next_data_path(&prefix, 1, &uuid, &props).as_ref(),
"x=0/y=0/part-00001-02f09a3f-1624-3b1d-8409-44eff7708208-c000.parquet"
);

let props = WriterProperties::builder()
.set_compression(Compression::SNAPPY)
.build();
assert_eq!(
next_data_path(&prefix, 1, &uuid, &props).as_ref(),
"x=0/y=0/part-00001-02f09a3f-1624-3b1d-8409-44eff7708208-c000.snappy.parquet"
);

let props = WriterProperties::builder()
.set_compression(Compression::GZIP(GzipLevel::default()))
.build();
assert_eq!(
next_data_path(&prefix, 1, &uuid, &props).as_ref(),
"x=0/y=0/part-00001-02f09a3f-1624-3b1d-8409-44eff7708208-c000.gz.parquet"
);

let props = WriterProperties::builder()
.set_compression(Compression::LZ4)
.build();
assert_eq!(
next_data_path(&prefix, 1, &uuid, &props).as_ref(),
"x=0/y=0/part-00001-02f09a3f-1624-3b1d-8409-44eff7708208-c000.lz4.parquet"
);

let props = WriterProperties::builder()
.set_compression(Compression::ZSTD(ZstdLevel::default()))
.build();
assert_eq!(
next_data_path(&prefix, 1, &uuid, &props).as_ref(),
"x=0/y=0/part-00001-02f09a3f-1624-3b1d-8409-44eff7708208-c000.zstd.parquet"
);

let props = WriterProperties::builder()
.set_compression(Compression::LZ4_RAW)
.build();
assert_eq!(
next_data_path(&prefix, 1, &uuid, &props).as_ref(),
"x=0/y=0/part-00001-02f09a3f-1624-3b1d-8409-44eff7708208-c000.lz4raw.parquet"
);

let props = WriterProperties::builder()
.set_compression(Compression::BROTLI(BrotliLevel::default()))
.build();
assert_eq!(
next_data_path(&prefix, 1, &uuid, &props).as_ref(),
"x=0/y=0/part-00001-02f09a3f-1624-3b1d-8409-44eff7708208-c000.br.parquet"
);
}
}

0 comments on commit 197db59

Please sign in to comment.