Skip to content

Commit

Permalink
Merge branch 'main' into fix-num-rows-stat
Browse files Browse the repository at this point in the history
  • Loading branch information
rtyler authored Nov 22, 2024
2 parents 6faf599 + cb80931 commit e858cae
Show file tree
Hide file tree
Showing 8 changed files with 60 additions and 27 deletions.
2 changes: 1 addition & 1 deletion crates/aws/src/credentials.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
use std::collections::HashMap;
use std::str::FromStr;
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use std::time::SystemTime;

use aws_config::default_provider::credentials::DefaultCredentialsChain;
use aws_config::meta::credentials::CredentialsProviderChain;
Expand Down
26 changes: 19 additions & 7 deletions crates/core/src/delta_datafusion/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ use arrow_schema::{DataType, Field};
use chrono::{DateTime, NaiveDate};
use datafusion::execution::context::SessionState;
use datafusion::execution::session_state::SessionStateBuilder;
use datafusion::functions_array::make_array::MakeArray;
use datafusion::functions_nested::make_array::MakeArray;
use datafusion::functions_nested::planner::{FieldAccessPlanner, NestedFunctionPlanner};
use datafusion_common::Result as DFResult;
use datafusion_common::{config::ConfigOptions, DFSchema, Result, ScalarValue, TableReference};
use datafusion_expr::expr::InList;
Expand Down Expand Up @@ -104,6 +105,7 @@ impl ScalarUDFImpl for MakeParquetArray {
data_type = arg.data_type();
}

#[allow(deprecated)]
match self.actual.invoke(args)? {
ColumnarValue::Scalar(ScalarValue::List(df_array)) => {
let field = Arc::new(Field::new("element", data_type, true));
Expand All @@ -126,7 +128,7 @@ impl ScalarUDFImpl for MakeParquetArray {
}

fn invoke_no_args(&self, number_rows: usize) -> Result<ColumnarValue> {
self.actual.invoke_no_args(number_rows)
self.actual.invoke_batch(&[], number_rows)
}

fn aliases(&self) -> &[String] {
Expand All @@ -142,9 +144,7 @@ impl ScalarUDFImpl for MakeParquetArray {
}
}

use datafusion::functions_array::planner::{FieldAccessPlanner, NestedFunctionPlanner};

/// This exists becxause the NestedFunctionPlanner _not_ the UserDefinedFunctionPlanner handles the
/// This exists because the NestedFunctionPlanner, _not_ the UserDefinedFunctionPlanner, handles the
/// insertion of "make_array" which is used to turn [100] into List<field=element, values=[100]>
///
/// **screaming intensifies**
Expand Down Expand Up @@ -505,7 +505,7 @@ impl<'a> fmt::Display for ScalarValueFormat<'a> {
ScalarValue::Date32(e) => match e {
Some(e) => write!(
f,
"{}",
"'{}'::date",
NaiveDate::from_num_days_from_ce_opt(EPOCH_DAYS_FROM_CE + (*e)).ok_or(Error)?
)?,
None => write!(f, "NULL")?,
Expand Down Expand Up @@ -567,8 +567,8 @@ impl<'a> fmt::Display for ScalarValueFormat<'a> {
#[cfg(test)]
mod test {
use arrow_schema::DataType as ArrowDataType;
use datafusion::functions_array::expr_fn::cardinality;
use datafusion::functions_nested::expr_ext::{IndexAccessor, SliceAccessor};
use datafusion::functions_nested::expr_fn::cardinality;
use datafusion::prelude::SessionContext;
use datafusion_common::{Column, ScalarValue, ToDFSchema};
use datafusion_expr::expr::ScalarFunction;
Expand Down Expand Up @@ -875,6 +875,18 @@ mod test {
)
)),
},
ParseTest {
expr: col("_date").eq(lit(ScalarValue::Date32(Some(18262)))),
expected: "_date = '2020-01-01'::date".to_string(),
override_expected_expr: Some(col("_date").eq(
Expr::Cast(
Cast {
expr: Box::from(lit("2020-01-01")),
data_type: arrow_schema::DataType::Date32
}
)
)),
},
];

let session: SessionContext = DeltaSessionContext::default().into();
Expand Down
29 changes: 16 additions & 13 deletions crates/core/src/writer/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,9 +252,9 @@ impl StatsScalar {
macro_rules! get_stat {
($val: expr) => {
if use_min {
*$val.min()
*$val.min_opt().unwrap()
} else {
*$val.max()
*$val.max_opt().unwrap()
}
};
}
Expand Down Expand Up @@ -304,10 +304,11 @@ impl StatsScalar {
(Statistics::Double(v), _) => Ok(Self::Float64(get_stat!(v))),
(Statistics::ByteArray(v), logical_type) => {
let bytes = if use_min {
v.min_bytes()
v.min_bytes_opt()
} else {
v.max_bytes()
};
v.max_bytes_opt()
}
.unwrap_or_default();
match logical_type {
None => Ok(Self::Bytes(bytes.to_vec())),
Some(LogicalType::String) => {
Expand All @@ -326,10 +327,11 @@ impl StatsScalar {
}
(Statistics::FixedLenByteArray(v), Some(LogicalType::Decimal { scale, precision })) => {
let val = if use_min {
v.min_bytes()
v.min_bytes_opt()
} else {
v.max_bytes()
};
v.max_bytes_opt()
}
.unwrap_or_default();

let val = if val.len() <= 16 {
i128::from_be_bytes(sign_extend_be(val)) as f64
Expand All @@ -356,10 +358,11 @@ impl StatsScalar {
}
(Statistics::FixedLenByteArray(v), Some(LogicalType::Uuid)) => {
let val = if use_min {
v.min_bytes()
v.min_bytes_opt()
} else {
v.max_bytes()
};
v.max_bytes_opt()
}
.unwrap_or_default();

if val.len() != 16 {
return Err(DeltaWriterError::StatsParsingFailed {
Expand Down Expand Up @@ -432,8 +435,8 @@ struct AggregatedStats {
impl From<(&Statistics, &Option<LogicalType>)> for AggregatedStats {
fn from(value: (&Statistics, &Option<LogicalType>)) -> Self {
let (stats, logical_type) = value;
let null_count = stats.null_count();
if stats.has_min_max_set() {
let null_count = stats.null_count_opt().unwrap_or_default();
if stats.min_bytes_opt().is_some() && stats.max_bytes_opt().is_some() {
let min = StatsScalar::try_from_stats(stats, logical_type, true).ok();
let max = StatsScalar::try_from_stats(stats, logical_type, false).ok();
Self {
Expand Down
2 changes: 1 addition & 1 deletion crates/test/src/datafusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::sync::Arc;

pub fn context_with_delta_table_factory() -> SessionContext {
let cfg = RuntimeConfig::new();
let env = RuntimeEnv::new(cfg).unwrap();
let env = RuntimeEnv::try_new(cfg).unwrap();
let ses = SessionConfig::new();
let mut state = SessionStateBuilder::new()
.with_config(ses)
Expand Down
4 changes: 2 additions & 2 deletions python/src/features.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ use deltalake::kernel::TableFeatures as KernelTableFeatures;
use pyo3::pyclass;

/// High level table features
#[pyclass]
#[derive(Clone)]
#[pyclass(eq, eq_int)]
#[derive(Clone, PartialEq)]
pub enum TableFeatures {
/// Mapping of one column to another
ColumnMapping,
Expand Down
4 changes: 4 additions & 0 deletions python/src/filesystem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -503,10 +503,12 @@ impl ObjectInputFile {
Err(PyNotImplementedError::new_err("'truncate' not implemented"))
}

#[pyo3(signature = (_size=None))]
fn readline(&self, _size: Option<i64>) -> PyResult<()> {
Err(PyNotImplementedError::new_err("'readline' not implemented"))
}

#[pyo3(signature = (_hint=None))]
fn readlines(&self, _hint: Option<i64>) -> PyResult<()> {
Err(PyNotImplementedError::new_err(
"'readlines' not implemented",
Expand Down Expand Up @@ -666,10 +668,12 @@ impl ObjectOutputStream {
Err(PyNotImplementedError::new_err("'truncate' not implemented"))
}

#[pyo3(signature = (_size=None))]
fn readline(&self, _size: Option<i64>) -> PyResult<()> {
Err(PyNotImplementedError::new_err("'readline' not implemented"))
}

#[pyo3(signature = (_hint=None))]
fn readlines(&self, _hint: Option<i64>) -> PyResult<()> {
Err(PyNotImplementedError::new_err(
"'readlines' not implemented",
Expand Down
15 changes: 13 additions & 2 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ impl RawDeltaTable {
})
}

#[pyo3(signature = (partition_filters=None))]
pub fn files(
&self,
py: Python,
Expand Down Expand Up @@ -316,6 +317,7 @@ impl RawDeltaTable {
})
}

#[pyo3(signature = (partition_filters=None))]
pub fn file_uris(
&self,
partition_filters: Option<Vec<(PyBackedStr, PyBackedStr, PartitionFilterValue)>>,
Expand Down Expand Up @@ -828,6 +830,7 @@ impl RawDeltaTable {
}

/// Run the History command on the Delta Table: Returns provenance information, including the operation, user, and so on, for each write to a table.
#[pyo3(signature = (limit=None))]
pub fn history(&mut self, limit: Option<usize>) -> PyResult<Vec<String>> {
let history = rt()
.block_on(self._table.history(limit))
Expand All @@ -845,6 +848,7 @@ impl RawDeltaTable {
.map_err(PythonError::from)?)
}

#[pyo3(signature = (schema, partition_filters=None))]
pub fn dataset_partitions<'py>(
&mut self,
py: Python<'py>,
Expand Down Expand Up @@ -876,6 +880,7 @@ impl RawDeltaTable {
.collect()
}

#[pyo3(signature = (partitions_filters=None))]
fn get_active_partitions<'py>(
&self,
partitions_filters: Option<Vec<(PyBackedStr, PyBackedStr, PartitionFilterValue)>>,
Expand Down Expand Up @@ -969,6 +974,7 @@ impl RawDeltaTable {
}

#[allow(clippy::too_many_arguments)]
#[pyo3(signature = (add_actions, mode, partition_by, schema, partitions_filters=None, commit_properties=None, post_commithook_properties=None))]
fn create_write_transaction(
&mut self,
py: Python,
Expand Down Expand Up @@ -1431,7 +1437,7 @@ fn scalar_to_py<'py>(value: &Scalar, py_date: &Bound<'py, PyAny>) -> PyResult<Bo
Double(val) => val.to_object(py),
Timestamp(_) => {
// We need to manually append 'Z' add to end so that pyarrow can cast the
// the scalar value to pa.timestamp("us","UTC")
// scalar value to pa.timestamp("us","UTC")
let value = value.serialize();
format!("{}Z", value).to_object(py)
}
Expand All @@ -1453,7 +1459,7 @@ fn scalar_to_py<'py>(value: &Scalar, py_date: &Bound<'py, PyAny>) -> PyResult<Bo
}
py_struct.to_object(py)
}
Array(val) => todo!("how should this be converted!"),
Array(_val) => todo!("how should this be converted!"),
};

Ok(val.into_bound(py))
Expand Down Expand Up @@ -1747,6 +1753,7 @@ pub struct PyCommitProperties {

#[pyfunction]
#[allow(clippy::too_many_arguments)]
#[pyo3(signature = (table_uri, data, mode, table=None, schema_mode=None, partition_by=None, predicate=None, target_file_size=None, name=None, description=None, configuration=None, storage_options=None, writer_properties=None, commit_properties=None, post_commithook_properties=None))]
fn write_to_deltalake(
py: Python,
table_uri: String,
Expand Down Expand Up @@ -1828,6 +1835,7 @@ fn write_to_deltalake(

#[pyfunction]
#[allow(clippy::too_many_arguments)]
#[pyo3(signature = (table_uri, schema, partition_by, mode, raise_if_key_not_exists, name=None, description=None, configuration=None, storage_options=None, custom_metadata=None))]
fn create_deltalake(
py: Python,
table_uri: String,
Expand Down Expand Up @@ -1884,6 +1892,7 @@ fn create_deltalake(

#[pyfunction]
#[allow(clippy::too_many_arguments)]
#[pyo3(signature = (table_uri, schema, add_actions, _mode, partition_by, name=None, description=None, configuration=None, storage_options=None, custom_metadata=None))]
fn write_new_deltalake(
py: Python,
table_uri: String,
Expand Down Expand Up @@ -1938,6 +1947,7 @@ fn write_new_deltalake(

#[pyfunction]
#[allow(clippy::too_many_arguments)]
#[pyo3(signature = (uri, partition_schema=None, partition_strategy=None, name=None, description=None, configuration=None, storage_options=None, custom_metadata=None))]
fn convert_to_deltalake(
py: Python,
uri: String,
Expand Down Expand Up @@ -1992,6 +2002,7 @@ fn convert_to_deltalake(
}

#[pyfunction]
#[pyo3(signature = (table=None, configuration=None))]
fn get_num_idx_cols_and_stats_columns(
table: Option<&RawDeltaTable>,
configuration: Option<HashMap<String, Option<String>>>,
Expand Down
5 changes: 4 additions & 1 deletion python/tests/test_merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -805,7 +805,10 @@ def test_merge_date_partitioned_2344(tmp_path: pathlib.Path):

assert last_action["operation"] == "MERGE"
assert result == data
assert last_action["operationParameters"].get("predicate") == "2022-02-01 = date"
assert (
last_action["operationParameters"].get("predicate")
== "'2022-02-01'::date = date"
)


@pytest.mark.parametrize(
Expand Down

0 comments on commit e858cae

Please sign in to comment.