Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add stats to convert-to-delta operation #2491

Merged
merged 7 commits into from
May 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 36 additions & 8 deletions crates/core/src/operations/convert_to_delta.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,23 @@
//! Command for converting a Parquet table to a Delta table in place
// https://github.com/delta-io/delta/blob/1d5dd774111395b0c4dc1a69c94abc169b1c83b6/spark/src/main/scala/org/apache/spark/sql/delta/commands/ConvertToDeltaCommand.scala

use crate::operations::write::get_num_idx_cols_and_stats_columns;
use crate::{
kernel::{Add, DataType, Schema, StructField},
logstore::{LogStore, LogStoreRef},
operations::create::CreateBuilder,
protocol::SaveMode,
table::builder::ensure_table_uri,
table::config::DeltaConfigKey,
writer::stats::stats_from_parquet_metadata,
DeltaResult, DeltaTable, DeltaTableError, ObjectStoreError, NULL_PARTITION_VALUE_DATA_PATH,
};
use arrow::{datatypes::Schema as ArrowSchema, error::ArrowError};
use futures::{
future::{self, BoxFuture},
TryStreamExt,
};
use indexmap::IndexMap;
use parquet::{
arrow::async_reader::{ParquetObjectReader, ParquetRecordBatchStreamBuilder},
errors::ParquetError,
Expand Down Expand Up @@ -284,6 +287,10 @@ impl ConvertToDeltaBuilder {
// A vector of StructField of all unique partition columns in a Parquet table
let mut partition_schema_fields = HashMap::new();

// Obtain settings on which columns to skip collecting stats on if any
let (num_indexed_cols, stats_columns) =
get_num_idx_cols_and_stats_columns(None, self.configuration.clone());

for file in files {
// A HashMap from partition column to value for this parquet file only
let mut partition_values = HashMap::new();
Expand Down Expand Up @@ -328,6 +335,24 @@ impl ConvertToDeltaBuilder {
subpath = iter.next();
}

let batch_builder = ParquetRecordBatchStreamBuilder::new(ParquetObjectReader::new(
object_store.clone(),
file.clone(),
))
.await?;

// Fetch the stats
let parquet_metadata = batch_builder.metadata();
let stats = stats_from_parquet_metadata(
&IndexMap::from_iter(partition_values.clone().into_iter()),
parquet_metadata.as_ref(),
num_indexed_cols,
&stats_columns,
)
.map_err(|e| Error::DeltaTable(e.into()))?;
let stats_string =
serde_json::to_string(&stats).map_err(|e| Error::DeltaTable(e.into()))?;

actions.push(
Add {
path: percent_decode_str(file.location.as_ref())
Expand All @@ -349,19 +374,13 @@ impl ConvertToDeltaBuilder {
.collect(),
modification_time: file.last_modified.timestamp_millis(),
data_change: true,
stats: Some(stats_string),
..Default::default()
}
.into(),
);

let mut arrow_schema = ParquetRecordBatchStreamBuilder::new(ParquetObjectReader::new(
object_store.clone(),
file,
))
.await?
.schema()
.as_ref()
.clone();
let mut arrow_schema = batch_builder.schema().as_ref().clone();

// Arrow schema of Parquet files may have conflicting metatdata
// Since Arrow schema metadata is not used to generate Delta table schema, we set the metadata field to an empty HashMap
Expand Down Expand Up @@ -584,6 +603,15 @@ mod tests {
"part-00000-d22c627d-9655-4153-9527-f8995620fa42-c000.snappy.parquet"
);

let Some(Scalar::Struct(min_values, _)) = action.min_values() else {
panic!("Missing min values");
};
assert_eq!(min_values, vec![Scalar::Date(18628), Scalar::Integer(1)]);
let Some(Scalar::Struct(max_values, _)) = action.max_values() else {
panic!("Missing max values");
};
assert_eq!(max_values, vec![Scalar::Date(18632), Scalar::Integer(5)]);

assert_delta_table(
table,
path,
Expand Down
118 changes: 89 additions & 29 deletions crates/core/src/writer/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::time::{SystemTime, UNIX_EPOCH};
use std::{collections::HashMap, ops::AddAssign};

use indexmap::IndexMap;
use parquet::file::metadata::ParquetMetaData;
use parquet::format::FileMetaData;
use parquet::schema::types::{ColumnDescriptor, SchemaDescriptor};
use parquet::{basic::LogicalType, errors::ParquetError};
Expand Down Expand Up @@ -66,6 +67,32 @@ pub fn create_add(
})
}

// As opposed to `stats_from_file_metadata` which operates on `parquet::format::FileMetaData`,
// this function produces the stats by reading the metadata from already written out files.
//
// Note that the file metadata used here is actually `parquet::file::metadata::FileMetaData`
// which is a thrift decoding of the `parquet::format::FileMetaData` which is typically obtained
// when flushing the write.
pub(crate) fn stats_from_parquet_metadata(
partition_values: &IndexMap<String, Scalar>,
parquet_metadata: &ParquetMetaData,
num_indexed_cols: i32,
stats_columns: &Option<Vec<String>>,
) -> Result<Stats, DeltaWriterError> {
let num_rows = parquet_metadata.file_metadata().num_rows();
let schema_descriptor = parquet_metadata.file_metadata().schema_descr_ptr();
let row_group_metadata = parquet_metadata.row_groups().to_vec();

stats_from_metadata(
partition_values,
schema_descriptor,
row_group_metadata,
num_rows,
num_indexed_cols,
stats_columns,
)
}

fn stats_from_file_metadata(
partition_values: &IndexMap<String, Scalar>,
file_metadata: &FileMetaData,
Expand All @@ -75,27 +102,46 @@ fn stats_from_file_metadata(
let type_ptr = parquet::schema::types::from_thrift(file_metadata.schema.as_slice());
let schema_descriptor = type_ptr.map(|type_| Arc::new(SchemaDescriptor::new(type_)))?;

let mut min_values: HashMap<String, ColumnValueStat> = HashMap::new();
let mut max_values: HashMap<String, ColumnValueStat> = HashMap::new();
let mut null_count: HashMap<String, ColumnCountStat> = HashMap::new();

let row_group_metadata: Result<Vec<RowGroupMetaData>, ParquetError> = file_metadata
let row_group_metadata: Vec<RowGroupMetaData> = file_metadata
.row_groups
.iter()
.map(|rg| RowGroupMetaData::from_thrift(schema_descriptor.clone(), rg.clone()))
.collect();
let row_group_metadata = row_group_metadata?;
let schema_cols = file_metadata
.schema
.iter()
.map(|v| &v.name)
.collect::<Vec<_>>();
.collect::<Result<Vec<RowGroupMetaData>, ParquetError>>()?;

stats_from_metadata(
partition_values,
schema_descriptor,
row_group_metadata,
file_metadata.num_rows,
num_indexed_cols,
stats_columns,
)
}

fn stats_from_metadata(
partition_values: &IndexMap<String, Scalar>,
schema_descriptor: Arc<SchemaDescriptor>,
row_group_metadata: Vec<RowGroupMetaData>,
num_rows: i64,
num_indexed_cols: i32,
stats_columns: &Option<Vec<String>>,
) -> Result<Stats, DeltaWriterError> {
let mut min_values: HashMap<String, ColumnValueStat> = HashMap::new();
let mut max_values: HashMap<String, ColumnValueStat> = HashMap::new();
let mut null_count: HashMap<String, ColumnCountStat> = HashMap::new();

let idx_to_iterate = if let Some(stats_cols) = stats_columns {
stats_cols
.iter()
.map(|col| schema_cols[1..].iter().position(|value| *value == col))
.flatten()
schema_descriptor
.columns()
.into_iter()
.enumerate()
.filter_map(|(index, col)| {
if stats_cols.contains(&col.name().to_string()) {
Some(index)
} else {
None
}
})
.collect()
} else if num_indexed_cols == -1 {
(0..schema_descriptor.num_columns()).collect::<Vec<_>>()
Expand Down Expand Up @@ -149,7 +195,7 @@ fn stats_from_file_metadata(
Ok(Stats {
min_values,
max_values,
num_records: file_metadata.num_rows,
num_records: num_rows,
null_count,
})
}
Expand Down Expand Up @@ -262,18 +308,8 @@ impl StatsScalar {
v.max_bytes()
};

let val = if val.len() <= 4 {
let mut bytes = [0; 4];
bytes[..val.len()].copy_from_slice(val);
i32::from_be_bytes(bytes) as f64
} else if val.len() <= 8 {
let mut bytes = [0; 8];
bytes[..val.len()].copy_from_slice(val);
i64::from_be_bytes(bytes) as f64
} else if val.len() <= 16 {
let mut bytes = [0; 16];
bytes[..val.len()].copy_from_slice(val);
i128::from_be_bytes(bytes) as f64
let val = if val.len() <= 16 {
i128::from_be_bytes(sign_extend_be(val)) as f64
Comment on lines -265 to +312
Copy link
Contributor Author

@gruuya gruuya May 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this was causing those test failures.

In particular note that the above would wrongly extend slices that are not a power of 2 long. This is the case that occured for the failing tests, since this:

"decimal": pa.array([Decimal("10.000") + x for x in range(nrows)]),

would result in min [0, 39, 16] and max [0, 54, 176] fixed length byte arrays, and these would in turn be coerced to [0, 39, 16, 0] an [0, 54, 176, 0], instead of [0, 0, 39, 16] and [0, 0, 54, 176] respectively.

Contrast with arrow-rs/datafusion where the extension happens at the beginning of the array:
https://github.com/apache/arrow-rs/blob/b25c441745602c9967b1e3cc4a28bc469cfb1311/parquet/src/arrow/array_reader/fixed_len_byte_array.rs#L170

Granted I'm not sure what/how encodes the min value "10.000" to [0, 39, 16] in the first place, aside from that it occurs in pyarrow.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @ion-elgreco, can you approve the rest of the workflows?

} else {
return Err(DeltaWriterError::StatsParsingFailed {
debug_value: format!("{val:?}"),
Expand Down Expand Up @@ -315,6 +351,19 @@ impl StatsScalar {
}
}

/// Performs big endian sign extension
/// Copied from arrow-rs repo/parquet crate:
/// https://github.com/apache/arrow-rs/blob/b25c441745602c9967b1e3cc4a28bc469cfb1311/parquet/src/arrow/buffer/bit_util.rs#L54
pub fn sign_extend_be<const N: usize>(b: &[u8]) -> [u8; N] {
assert!(b.len() <= N, "Array too large, expected less than {N}");
let is_negative = (b[0] & 128u8) == 128u8;
let mut result = if is_negative { [255u8; N] } else { [0u8; N] };
for (d, s) in result.iter_mut().skip(N - b.len()).zip(b) {
*d = *s;
}
result
}

impl From<StatsScalar> for serde_json::Value {
fn from(scalar: StatsScalar) -> Self {
match scalar {
Expand Down Expand Up @@ -653,6 +702,17 @@ mod tests {
}),
Value::from(1243124142314.423),
),
(
simple_parquet_stat!(
Statistics::FixedLenByteArray,
FixedLenByteArray::from(vec![0, 39, 16])
),
Some(LogicalType::Decimal {
scale: 3,
precision: 5,
}),
Value::from(10.0),
),
(
simple_parquet_stat!(
Statistics::FixedLenByteArray,
Expand Down
Loading