Skip to content

Commit

Permalink
fix: include stats for all columns (#1223) (#1342)
Browse files Browse the repository at this point in the history
# Description
This is a proposal for how #1223 could be fixed.

# Related Issue(s)
- fixes #1223

# Documentation
The current implementation excludes all columns that lack statistical
information. The proposed fix will generate information for all columns,
with missing statistical values being replaced by 'null' values.
However, it is unclear if this is the correct behavior since the
`stats_as_batch` function lacks documentation.

Co-authored-by: Will Jones <[email protected]>
  • Loading branch information
mrjoe7 and wjones127 authored May 13, 2023
1 parent 5665eb1 commit 8a4b2b8
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 38 deletions.
126 changes: 88 additions & 38 deletions rust/src/table_state_arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ use crate::DeltaTableError;
use crate::SchemaDataType;
use crate::SchemaTypeStruct;
use arrow::array::{
ArrayRef, BinaryArray, BooleanArray, Date32Array, Float64Array, Int64Array, StringArray,
StructArray, TimestampMicrosecondArray, TimestampMillisecondArray,
Array, ArrayRef, BinaryArray, BooleanArray, Date32Array, Float64Array, Int64Array, NullArray,
StringArray, StructArray, TimestampMicrosecondArray, TimestampMillisecondArray,
};
use arrow::compute::cast;
use arrow::compute::kernels::cast_utils::Parser;
Expand Down Expand Up @@ -304,53 +304,74 @@ impl DeltaTableState {
max_values: Option<ArrayRef>,
}

let filter_out_empty_stats = |stats: &Result<ColStats, DeltaTableError>| -> bool {
let is_field_empty = |arr: &Option<ArrayRef>| -> bool {
match arr {
Some(arr) => arr.len() == arr.null_count(),
None => true,
}
};

let is_stats_empty = |stats: &ColStats| -> bool {
is_field_empty(&stats.null_count)
&& is_field_empty(&stats.min_values)
&& is_field_empty(&stats.max_values)
};

if let Ok(stats) = stats {
!is_stats_empty(stats)
} else {
true // keep errs
}
};

let mut columnar_stats: Vec<ColStats> = SchemaLeafIterator::new(schema)
.filter(|(_path, datatype)| !matches!(datatype, SchemaDataType::r#struct(_)))
.map(|(path, datatype)| -> Result<ColStats, DeltaTableError> {
let null_count: Option<ArrayRef> = stats
let null_count = stats
.iter()
.flat_map(|maybe_stat| {
maybe_stat
.as_ref()
.map(|stat| resolve_column_count_stat(&stat.null_count, &path))
})
.collect::<Option<Vec<DeltaDataTypeLong>>>()
.map(arrow::array::Int64Array::from)
.map(|arr| -> ArrayRef { Arc::new(arr) });
.collect::<Vec<Option<DeltaDataTypeLong>>>();
let null_count = Some(value_vec_to_array(null_count, |values| {
Ok(Arc::new(arrow::array::Int64Array::from(values)))
})?);

let arrow_type: arrow::datatypes::DataType = datatype.try_into()?;

// Min and max are collected for primitive values, not list or maps
let min_values = if matches!(datatype, SchemaDataType::primitive(_)) {
stats
let min_values = stats
.iter()
.flat_map(|maybe_stat| {
maybe_stat
.as_ref()
.map(|stat| resolve_column_value_stat(&stat.min_values, &path))
})
.collect::<Option<Vec<&serde_json::Value>>>()
.map(|min_values| {
json_value_to_array_general(&arrow_type, min_values.into_iter())
})
.transpose()?
.collect::<Vec<Option<&serde_json::Value>>>();

Some(value_vec_to_array(min_values, |values| {
json_value_to_array_general(&arrow_type, values.into_iter())
})?)
} else {
None
};

let max_values = if matches!(datatype, SchemaDataType::primitive(_)) {
stats
let max_values = stats
.iter()
.flat_map(|maybe_stat| {
maybe_stat
.as_ref()
.map(|stat| resolve_column_value_stat(&stat.max_values, &path))
})
.collect::<Option<Vec<&serde_json::Value>>>()
.map(|max_values| {
json_value_to_array_general(&arrow_type, max_values.into_iter())
})
.transpose()?
.collect::<Vec<Option<&serde_json::Value>>>();
Some(value_vec_to_array(max_values, |values| {
json_value_to_array_general(&arrow_type, values.into_iter())
})?)
} else {
None
};
Expand All @@ -362,6 +383,7 @@ impl DeltaTableState {
max_values,
})
})
.filter(filter_out_empty_stats)
.collect::<Result<_, DeltaTableError>>()?;

let mut out_columns: Vec<(Cow<str>, ArrayRef)> =
Expand Down Expand Up @@ -407,7 +429,7 @@ impl DeltaTableState {
.last()
.expect("paths must have at least one element"),
values.data_type().clone(),
false,
true,
);
Some((field, Arc::clone(values)))
} else {
Expand Down Expand Up @@ -480,6 +502,20 @@ impl DeltaTableState {
}
}

fn value_vec_to_array<T, F>(
value_vec: Vec<Option<T>>,
map_fn: F,
) -> Result<ArrayRef, DeltaTableError>
where
F: FnOnce(Vec<Option<T>>) -> Result<ArrayRef, DeltaTableError>,
{
if value_vec.iter().all(Option::is_none) {
Ok(Arc::new(NullArray::new(value_vec.len())))
} else {
map_fn(value_vec)
}
}

fn resolve_column_value_stat<'a>(
values: &'a HashMap<String, ColumnValueStat>,
path: &[&'a str],
Expand Down Expand Up @@ -546,41 +582,55 @@ impl<'a> std::iter::Iterator for SchemaLeafIterator<'a> {

fn json_value_to_array_general<'a>(
datatype: &arrow::datatypes::DataType,
values: impl Iterator<Item = &'a serde_json::Value>,
values: impl Iterator<Item = Option<&'a serde_json::Value>>,
) -> Result<ArrayRef, DeltaTableError> {
match datatype {
DataType::Boolean => Ok(Arc::new(
DataType::Boolean => Ok(Arc::new(BooleanArray::from(
values
.map(|value| value.as_bool())
.collect::<BooleanArray>(),
)),
.map(|value| value.and_then(serde_json::Value::as_bool))
.collect_vec(),
))),
DataType::Int64 | DataType::Int32 | DataType::Int16 | DataType::Int8 => {
let i64_arr: ArrayRef =
Arc::new(values.map(|value| value.as_i64()).collect::<Int64Array>());
let i64_arr: ArrayRef = Arc::new(Int64Array::from(
values
.map(|value| value.and_then(serde_json::Value::as_i64))
.collect_vec(),
));
Ok(arrow::compute::cast(&i64_arr, datatype)?)
}
DataType::Float32 | DataType::Float64 | DataType::Decimal128(_, _) => {
let f64_arr: ArrayRef =
Arc::new(values.map(|value| value.as_f64()).collect::<Float64Array>());
let f64_arr: ArrayRef = Arc::new(Float64Array::from(
values
.map(|value| value.and_then(serde_json::Value::as_f64))
.collect_vec(),
));
Ok(arrow::compute::cast(&f64_arr, datatype)?)
}
DataType::Utf8 => Ok(Arc::new(
values.map(|value| value.as_str()).collect::<StringArray>(),
)),
DataType::Binary => Ok(Arc::new(
values.map(|value| value.as_str()).collect::<BinaryArray>(),
)),
DataType::Utf8 => Ok(Arc::new(StringArray::from(
values
.map(|value| value.and_then(serde_json::Value::as_str))
.collect_vec(),
))),
DataType::Binary => Ok(Arc::new(BinaryArray::from(
values
.map(|value| value.and_then(|value| value.as_str().map(|value| value.as_bytes())))
.collect_vec(),
))),
DataType::Timestamp(TimeUnit::Microsecond, None) => {
Ok(Arc::new(TimestampMicrosecondArray::from(
values
.map(|value| value.as_str().and_then(TimestampMicrosecondType::parse))
.collect::<Vec<Option<i64>>>(),
.map(|value| {
value.and_then(|value| {
value.as_str().and_then(TimestampMicrosecondType::parse)
})
})
.collect_vec(),
)))
}
DataType::Date32 => Ok(Arc::new(Date32Array::from(
values
.map(|value| value.as_str().and_then(Date32Type::parse))
.collect::<Vec<Option<i32>>>(),
.map(|value| value.and_then(|value| value.as_str().and_then(Date32Type::parse)))
.collect_vec(),
))),
_ => Err(DeltaTableError::Generic("Invalid datatype".to_string())),
}
Expand Down
6 changes: 6 additions & 0 deletions rust/tests/add_actions_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,10 +204,14 @@ async fn test_only_struct_stats() {
"null_count.null",
Arc::new(array::Int64Array::from(vec![1])),
),
("min.null", Arc::new(array::NullArray::new(1))),
("max.null", Arc::new(array::NullArray::new(1))),
(
"null_count.boolean",
Arc::new(array::Int64Array::from(vec![0])),
),
("min.boolean", Arc::new(array::NullArray::new(1))),
("max.boolean", Arc::new(array::NullArray::new(1))),
(
"null_count.double",
Arc::new(array::Int64Array::from(vec![0])),
Expand Down Expand Up @@ -256,6 +260,8 @@ async fn test_only_struct_stats() {
"null_count.binary",
Arc::new(array::Int64Array::from(vec![0])),
),
("min.binary", Arc::new(array::NullArray::new(1))),
("max.binary", Arc::new(array::NullArray::new(1))),
(
"null_count.date",
Arc::new(array::Int64Array::from(vec![0])),
Expand Down

0 comments on commit 8a4b2b8

Please sign in to comment.