Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(python, rust): respect column stats collection configurations #2428

Merged
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
570b9fb
make stats_cols and num_index accessible in python
ion-elgreco Apr 21, 2024
8241707
respect file stats column collection configs
ion-elgreco Apr 21, 2024
e10f8be
Collect table stats according to config in rust writer
ion-elgreco Apr 21, 2024
68c7817
rm dbgs
ion-elgreco Apr 21, 2024
82e4e40
clippy
ion-elgreco Apr 21, 2024
2b04a13
add tests
ion-elgreco Apr 21, 2024
faa57f0
lint
ion-elgreco Apr 21, 2024
6c4908b
add missing params
ion-elgreco Apr 21, 2024
fec7f7e
handle -1
ion-elgreco Apr 21, 2024
3efe20b
Merge branch 'main' into feat/respect_column_stats_collection
rtyler Apr 22, 2024
9b6028f
Merge branch 'main' into feat/respect_column_stats_collection
ion-elgreco May 2, 2024
11c5e3e
fmt and simplify
ion-elgreco May 2, 2024
9c66dac
Merge branch 'main' into feat/respect_column_stats_collection
ion-elgreco May 6, 2024
b3aba76
Merge branch 'main' into feat/respect_column_stats_collection
ion-elgreco May 7, 2024
8aba9ca
Merge branch 'main' into feat/respect_column_stats_collection
ion-elgreco May 10, 2024
c943ab8
create vec of idx first
ion-elgreco May 10, 2024
71d9da5
fmt
ion-elgreco May 10, 2024
d2586bb
rewrite config grabbing
ion-elgreco May 10, 2024
f2662d0
fmt
ion-elgreco May 10, 2024
dc41b20
rm print
ion-elgreco May 10, 2024
56b0a3e
add by accident removed lines back
ion-elgreco May 10, 2024
384edc5
Merge branch 'main' into feat/respect_column_stats_collection
ion-elgreco May 10, 2024
3af484b
Merge branch 'main' into feat/respect_column_stats_collection
ion-elgreco May 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions crates/core/src/operations/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use serde::Serialize;

use super::datafusion_utils::Expression;
use super::transaction::{CommitBuilder, CommitProperties, PROTOCOL};
use super::write::WriterStatsConfig;
use crate::delta_datafusion::expr::fmt_expr_to_sql;
use crate::delta_datafusion::{
create_physical_expr_fix, find_files, register_store, DataFusionMixins, DeltaScanBuilder,
Expand Down Expand Up @@ -153,6 +154,14 @@ async fn excute_non_empty_expr(
let filter: Arc<dyn ExecutionPlan> =
Arc::new(FilterExec::try_new(predicate_expr, scan.clone())?);

let writer_stats_config = WriterStatsConfig::new(
snapshot.table_config().num_indexed_cols(),
snapshot
.table_config()
.stats_columns()
.map(|v| v.iter().map(|v| v.to_string()).collect::<Vec<String>>()),
);

let add_actions = write_execution_plan(
Some(snapshot),
state.clone(),
Expand All @@ -164,6 +173,7 @@ async fn excute_non_empty_expr(
writer_properties,
false,
None,
writer_stats_config,
)
.await?
.into_iter()
Expand Down
11 changes: 10 additions & 1 deletion crates/core/src/operations/merge/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ use crate::kernel::Action;
use crate::logstore::LogStoreRef;
use crate::operations::merge::barrier::find_barrier_node;
use crate::operations::transaction::CommitBuilder;
use crate::operations::write::write_execution_plan;
use crate::operations::write::{write_execution_plan, WriterStatsConfig};
use crate::protocol::{DeltaOperation, MergePredicate};
use crate::table::state::DeltaTableState;
use crate::{DeltaResult, DeltaTable, DeltaTableError};
Expand Down Expand Up @@ -1368,6 +1368,14 @@ async fn execute(
// write projected records
let table_partition_cols = current_metadata.partition_columns.clone();

let writer_stats_config = WriterStatsConfig::new(
snapshot.table_config().num_indexed_cols(),
snapshot
.table_config()
.stats_columns()
.map(|v| v.iter().map(|v| v.to_string()).collect::<Vec<String>>()),
);

let rewrite_start = Instant::now();
let add_actions = write_execution_plan(
Some(&snapshot),
Expand All @@ -1380,6 +1388,7 @@ async fn execute(
writer_properties,
safe_cast,
None,
writer_stats_config,
)
.await?;

Expand Down
16 changes: 15 additions & 1 deletion crates/core/src/operations/optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,10 @@ pub struct MergeTaskParameters {
file_schema: ArrowSchemaRef,
/// Properties passed to parquet writer
writer_properties: WriterProperties,
/// Num index cols to collect stats for
num_indexed_cols: i32,
/// Stats columns, specific columns to collect stats from, takes precedence over num_indexed_cols
stats_columns: Option<Vec<String>>,
}

/// A stream of record batches, with a ParquetError on failure.
Expand Down Expand Up @@ -483,7 +487,12 @@ impl MergePlan {
Some(task_parameters.input_parameters.target_size as usize),
None,
)?;
let mut writer = PartitionWriter::try_with_config(object_store, writer_config)?;
let mut writer = PartitionWriter::try_with_config(
object_store,
writer_config,
task_parameters.num_indexed_cols,
task_parameters.stats_columns.clone(),
)?;

let mut read_stream = read_stream.await?;

Expand Down Expand Up @@ -841,6 +850,11 @@ pub fn create_merge_plan(
input_parameters,
file_schema,
writer_properties,
num_indexed_cols: snapshot.table_config().num_indexed_cols(),
stats_columns: snapshot
.table_config()
.stats_columns()
.map(|v| v.iter().map(|v| v.to_string()).collect::<Vec<String>>()),
}),
read_table_version: snapshot.version(),
})
Expand Down
11 changes: 10 additions & 1 deletion crates/core/src/operations/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,12 @@ use futures::future::BoxFuture;
use parquet::file::properties::WriterProperties;
use serde::Serialize;

use super::transaction::PROTOCOL;
use super::write::write_execution_plan;
use super::{
datafusion_utils::Expression,
transaction::{CommitBuilder, CommitProperties},
};
use super::{transaction::PROTOCOL, write::WriterStatsConfig};
use crate::delta_datafusion::{
create_physical_expr_fix, expr::fmt_expr_to_sql, physical::MetricObserverExec,
DataFusionMixins, DeltaColumn, DeltaSessionContext,
Expand Down Expand Up @@ -348,6 +348,14 @@ async fn execute(
projection_update.clone(),
)?);

let writer_stats_config = WriterStatsConfig::new(
snapshot.table_config().num_indexed_cols(),
snapshot
.table_config()
.stats_columns()
.map(|v| v.iter().map(|v| v.to_string()).collect::<Vec<String>>()),
);

let add_actions = write_execution_plan(
Some(&snapshot),
state.clone(),
Expand All @@ -359,6 +367,7 @@ async fn execute(
writer_properties,
safe_cast,
None,
writer_stats_config,
)
.await?;

Expand Down
56 changes: 56 additions & 0 deletions crates/core/src/operations/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ use crate::logstore::LogStoreRef;
use crate::operations::cast::{cast_record_batch, merge_schema};
use crate::protocol::{DeltaOperation, SaveMode};
use crate::storage::ObjectStoreRef;
use crate::table::config::DEFAULT_NUM_INDEX_COLS;
use crate::table::state::DeltaTableState;
use crate::table::Constraint as DeltaConstraint;
use crate::writer::record_batch::divide_by_partition_values;
Expand Down Expand Up @@ -337,6 +338,24 @@ impl WriteBuilder {
}
}
}
/// Configuration for the writer on how to collect stats
#[derive(Clone)]
pub struct WriterStatsConfig {
/// Number of columns to collect stats for, idx based
num_indexed_cols: i32,
/// Optional list of columns which to collect stats for, takes precedende over num_index_cols
stats_columns: Option<Vec<String>>,
}

impl WriterStatsConfig {
/// Create new writer stats config
pub fn new(num_indexed_cols: i32, stats_columns: Option<Vec<String>>) -> Self {
Self {
num_indexed_cols,
stats_columns,
}
}
}

#[allow(clippy::too_many_arguments)]
async fn write_execution_plan_with_predicate(
Expand All @@ -351,6 +370,7 @@ async fn write_execution_plan_with_predicate(
writer_properties: Option<WriterProperties>,
safe_cast: bool,
schema_mode: Option<SchemaMode>,
writer_stats_config: WriterStatsConfig,
) -> DeltaResult<Vec<Action>> {
let schema: ArrowSchemaRef = if schema_mode.is_some() {
plan.schema()
Expand Down Expand Up @@ -386,6 +406,8 @@ async fn write_execution_plan_with_predicate(
writer_properties.clone(),
target_file_size,
write_batch_size,
writer_stats_config.num_indexed_cols,
writer_stats_config.stats_columns.clone(),
);
let mut writer = DeltaWriter::new(object_store.clone(), config);
let checker_stream = checker.clone();
Expand Down Expand Up @@ -438,6 +460,7 @@ pub(crate) async fn write_execution_plan(
writer_properties: Option<WriterProperties>,
safe_cast: bool,
schema_mode: Option<SchemaMode>,
writer_stats_config: WriterStatsConfig,
) -> DeltaResult<Vec<Action>> {
write_execution_plan_with_predicate(
None,
Expand All @@ -451,10 +474,12 @@ pub(crate) async fn write_execution_plan(
writer_properties,
safe_cast,
schema_mode,
writer_stats_config,
)
.await
}

#[allow(clippy::too_many_arguments)]
async fn execute_non_empty_expr(
snapshot: &DeltaTableState,
log_store: LogStoreRef,
Expand All @@ -463,6 +488,7 @@ async fn execute_non_empty_expr(
expression: &Expr,
rewrite: &[Add],
writer_properties: Option<WriterProperties>,
writer_stats_config: WriterStatsConfig,
) -> DeltaResult<Vec<Action>> {
// For each identified file perform a parquet scan + filter + limit (1) + count.
// If returned count is not zero then append the file to be rewritten and removed from the log. Otherwise do nothing to the file.
Expand Down Expand Up @@ -496,13 +522,15 @@ async fn execute_non_empty_expr(
writer_properties,
false,
None,
writer_stats_config,
)
.await?;

Ok(add_actions)
}

// This should only be called wth a valid predicate
#[allow(clippy::too_many_arguments)]
async fn prepare_predicate_actions(
predicate: Expr,
log_store: LogStoreRef,
Expand All @@ -511,6 +539,7 @@ async fn prepare_predicate_actions(
partition_columns: Vec<String>,
writer_properties: Option<WriterProperties>,
deletion_timestamp: i64,
writer_stats_config: WriterStatsConfig,
) -> DeltaResult<Vec<Action>> {
let candidates =
find_files(snapshot, log_store.clone(), &state, Some(predicate.clone())).await?;
Expand All @@ -526,6 +555,7 @@ async fn prepare_predicate_actions(
&predicate,
&candidates.candidates,
writer_properties,
writer_stats_config,
)
.await?
};
Expand Down Expand Up @@ -723,6 +753,30 @@ impl std::future::IntoFuture for WriteBuilder {
_ => (None, None),
};

let config = this
.snapshot
.as_ref()
.map(|snapshot| snapshot.table_config());

let (num_index_cols, stats_columns) = match &config {
Some(conf) => (conf.num_indexed_cols(), conf.stats_columns()),
_ => (
this.configuration
.get("delta.dataSkippingNumIndexedCols")
.and_then(|v| v.clone().map(|v| v.parse::<i32>().unwrap()))
.unwrap_or(DEFAULT_NUM_INDEX_COLS),
this.configuration
.get("delta.dataSkippingStatsColumns")
.and_then(|v| v.as_ref().map(|v| v.split(',').collect::<Vec<&str>>())),
),
};

let writer_stats_config = WriterStatsConfig {
num_indexed_cols: num_index_cols,
stats_columns: stats_columns
.clone()
.map(|v| v.iter().map(|v| v.to_string()).collect::<Vec<String>>()),
};
// Here we need to validate if the new data conforms to a predicate if one is provided
let add_actions = write_execution_plan_with_predicate(
predicate.clone(),
Expand All @@ -736,6 +790,7 @@ impl std::future::IntoFuture for WriteBuilder {
this.writer_properties.clone(),
this.safe_cast,
this.schema_mode,
writer_stats_config.clone(),
)
.await?;
actions.extend(add_actions);
Expand Down Expand Up @@ -772,6 +827,7 @@ impl std::future::IntoFuture for WriteBuilder {
partition_columns.clone(),
this.writer_properties,
deletion_timestamp,
writer_stats_config,
)
.await?;
if !predicate_actions.is_empty() {
Expand Down
Loading
Loading