diff --git a/crates/core/src/operations/mod.rs b/crates/core/src/operations/mod.rs index 515143f088..676098c832 100644 --- a/crates/core/src/operations/mod.rs +++ b/crates/core/src/operations/mod.rs @@ -283,6 +283,22 @@ pub fn get_num_idx_cols_and_stats_columns( ) } +/// Get the target_file_size from the table configuration in the sates +/// If table_config does not exist (only can occur in the first write action) it takes +/// the configuration that was passed to the writerBuilder. +pub(crate) fn get_target_file_size( + config: &Option>, + configuration: &HashMap>, +) -> i64 { + match &config { + Some(conf) => conf.target_file_size(), + _ => configuration + .get("delta.targetFileSize") + .and_then(|v| v.clone().map(|v| v.parse::().unwrap())) + .unwrap_or(crate::table::config::DEFAULT_TARGET_FILE_SIZE), + } +} + #[cfg(feature = "datafusion")] mod datafusion_utils { use datafusion::execution::context::SessionState; diff --git a/crates/core/src/operations/write.rs b/crates/core/src/operations/write.rs index b3f0f30c1b..ed5a3c9a2f 100644 --- a/crates/core/src/operations/write.rs +++ b/crates/core/src/operations/write.rs @@ -398,7 +398,6 @@ async fn write_execution_plan_with_predicate( } _ => checker, }; - // Write data to disk let mut tasks = vec![]; for i in 0..plan.properties().output_partitioning().partition_count() { @@ -977,6 +976,9 @@ impl std::future::IntoFuture for WriteBuilder { .as_ref() .map(|snapshot| snapshot.table_config()); + let target_file_size = this.target_file_size.or_else(|| { + Some(super::get_target_file_size(&config, &this.configuration) as usize) + }); let (num_indexed_cols, stats_columns) = super::get_num_idx_cols_and_stats_columns(config, this.configuration); @@ -984,6 +986,7 @@ impl std::future::IntoFuture for WriteBuilder { num_indexed_cols, stats_columns, }; + // Here we need to validate if the new data conforms to a predicate if one is provided let add_actions = write_execution_plan_with_predicate( predicate.clone(), @@ -992,7 +995,7 @@ impl std::future::IntoFuture for WriteBuilder { plan.clone(), partition_columns.clone(), this.log_store.object_store().clone(), - this.target_file_size, + target_file_size, this.write_batch_size, this.writer_properties.clone(), writer_stats_config.clone(), diff --git a/crates/core/src/table/config.rs b/crates/core/src/table/config.rs index 47307cfecd..68b41d6f67 100644 --- a/crates/core/src/table/config.rs +++ b/crates/core/src/table/config.rs @@ -210,6 +210,8 @@ pub struct TableConfig<'a>(pub(crate) &'a HashMap>); /// Default num index cols pub const DEFAULT_NUM_INDEX_COLS: i32 = 32; +/// Default target file size +pub const DEFAULT_TARGET_FILE_SIZE: i64 = 104857600; impl<'a> TableConfig<'a> { table_config!( diff --git a/python/deltalake/_internal.pyi b/python/deltalake/_internal.pyi index 5aedd5e162..ceac16e7f8 100644 --- a/python/deltalake/_internal.pyi +++ b/python/deltalake/_internal.pyi @@ -200,6 +200,7 @@ def write_to_deltalake( table: Optional[RawDeltaTable], schema_mode: Optional[str], predicate: Optional[str], + target_file_size: Optional[int], name: Optional[str], description: Optional[str], configuration: Optional[Mapping[str, Optional[str]]], diff --git a/python/deltalake/writer.py b/python/deltalake/writer.py index 95368dbf79..e08d9cc9b8 100644 --- a/python/deltalake/writer.py +++ b/python/deltalake/writer.py @@ -178,6 +178,7 @@ def write_deltalake( schema_mode: Optional[Literal["merge", "overwrite"]] = ..., storage_options: Optional[Dict[str, str]] = ..., predicate: Optional[str] = ..., + target_file_size: Optional[int] = ..., large_dtypes: bool = ..., engine: Literal["rust"] = ..., writer_properties: WriterProperties = ..., @@ -214,6 +215,7 @@ def write_deltalake( storage_options: Optional[Dict[str, str]] = None, partition_filters: Optional[List[Tuple[str, str, Any]]] = None, predicate: Optional[str] = None, + target_file_size: Optional[int] = None, large_dtypes: bool = False, engine: Literal["pyarrow", "rust"] = "rust", writer_properties: Optional[WriterProperties] = None, @@ -267,7 +269,8 @@ def write_deltalake( configuration: A map containing configuration options for the metadata action. schema_mode: If set to "overwrite", allows replacing the schema of the table. Set to "merge" to merge with existing schema. storage_options: options passed to the native delta filesystem. - predicate: When using `Overwrite` mode, replace data that matches a predicate. Only used in rust engine. + predicate: When using `Overwrite` mode, replace data that matches a predicate. Only used in rust engine.' + target_file_size: Override for target file size for data files written to the delta table. If not passed, it's taken from `delta.targetFileSize`. partition_filters: the partition filters that will be used for partition overwrite. Only used in pyarrow engine. large_dtypes: Only used for pyarrow engine engine: writer engine to write the delta table. PyArrow engine is deprecated, and will be removed in v1.0. @@ -308,6 +311,7 @@ def write_deltalake( table=table._table if table is not None else None, schema_mode=schema_mode, predicate=predicate, + target_file_size=target_file_size, name=name, description=description, configuration=configuration, diff --git a/python/src/lib.rs b/python/src/lib.rs index 787d321d08..aeb1b3c429 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -1602,6 +1602,7 @@ fn write_to_deltalake( schema_mode: Option, partition_by: Option>, predicate: Option, + target_file_size: Option, name: Option, description: Option, configuration: Option>>, @@ -1650,6 +1651,10 @@ fn write_to_deltalake( builder = builder.with_replace_where(predicate); }; + if let Some(target_file_size) = target_file_size { + builder = builder.with_target_file_size(target_file_size) + }; + if let Some(config) = configuration { builder = builder.with_configuration(config); };