Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: use table config target file size, expose target_file_size in python #2811

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions crates/core/src/operations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,22 @@ pub fn get_num_idx_cols_and_stats_columns(
)
}

/// Get the target_file_size from the table configuration in the sates
/// If table_config does not exist (only can occur in the first write action) it takes
/// the configuration that was passed to the writerBuilder.
pub(crate) fn get_target_file_size(
config: &Option<crate::table::config::TableConfig<'_>>,
configuration: &HashMap<String, Option<String>>,
) -> i64 {
match &config {
Some(conf) => conf.target_file_size(),
_ => configuration
.get("delta.targetFileSize")
.and_then(|v| v.clone().map(|v| v.parse::<i64>().unwrap()))
.unwrap_or(crate::table::config::DEFAULT_TARGET_FILE_SIZE),
}
}

#[cfg(feature = "datafusion")]
mod datafusion_utils {
use datafusion::execution::context::SessionState;
Expand Down
7 changes: 5 additions & 2 deletions crates/core/src/operations/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,6 @@ async fn write_execution_plan_with_predicate(
}
_ => checker,
};

// Write data to disk
let mut tasks = vec![];
for i in 0..plan.properties().output_partitioning().partition_count() {
Expand Down Expand Up @@ -977,13 +976,17 @@ impl std::future::IntoFuture for WriteBuilder {
.as_ref()
.map(|snapshot| snapshot.table_config());

let target_file_size = this.target_file_size.or_else(|| {
Some(super::get_target_file_size(&config, &this.configuration) as usize)
});
let (num_indexed_cols, stats_columns) =
super::get_num_idx_cols_and_stats_columns(config, this.configuration);

let writer_stats_config = WriterStatsConfig {
num_indexed_cols,
stats_columns,
};

// Here we need to validate if the new data conforms to a predicate if one is provided
let add_actions = write_execution_plan_with_predicate(
predicate.clone(),
Expand All @@ -992,7 +995,7 @@ impl std::future::IntoFuture for WriteBuilder {
plan.clone(),
partition_columns.clone(),
this.log_store.object_store().clone(),
this.target_file_size,
target_file_size,
this.write_batch_size,
this.writer_properties.clone(),
writer_stats_config.clone(),
Expand Down
2 changes: 2 additions & 0 deletions crates/core/src/table/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,8 @@ pub struct TableConfig<'a>(pub(crate) &'a HashMap<String, Option<String>>);

/// Default num index cols
pub const DEFAULT_NUM_INDEX_COLS: i32 = 32;
/// Default target file size
pub const DEFAULT_TARGET_FILE_SIZE: i64 = 104857600;

impl<'a> TableConfig<'a> {
table_config!(
Expand Down
1 change: 1 addition & 0 deletions python/deltalake/_internal.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ def write_to_deltalake(
table: Optional[RawDeltaTable],
schema_mode: Optional[str],
predicate: Optional[str],
target_file_size: Optional[int],
name: Optional[str],
description: Optional[str],
configuration: Optional[Mapping[str, Optional[str]]],
Expand Down
6 changes: 5 additions & 1 deletion python/deltalake/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ def write_deltalake(
schema_mode: Optional[Literal["merge", "overwrite"]] = ...,
storage_options: Optional[Dict[str, str]] = ...,
predicate: Optional[str] = ...,
target_file_size: Optional[int] = ...,
large_dtypes: bool = ...,
engine: Literal["rust"] = ...,
writer_properties: WriterProperties = ...,
Expand Down Expand Up @@ -214,6 +215,7 @@ def write_deltalake(
storage_options: Optional[Dict[str, str]] = None,
partition_filters: Optional[List[Tuple[str, str, Any]]] = None,
predicate: Optional[str] = None,
target_file_size: Optional[int] = None,
large_dtypes: bool = False,
engine: Literal["pyarrow", "rust"] = "rust",
writer_properties: Optional[WriterProperties] = None,
Expand Down Expand Up @@ -267,7 +269,8 @@ def write_deltalake(
configuration: A map containing configuration options for the metadata action.
schema_mode: If set to "overwrite", allows replacing the schema of the table. Set to "merge" to merge with existing schema.
storage_options: options passed to the native delta filesystem.
predicate: When using `Overwrite` mode, replace data that matches a predicate. Only used in rust engine.
predicate: When using `Overwrite` mode, replace data that matches a predicate. Only used in rust engine.'
target_file_size: Override for target file size for data files written to the delta table. If not passed, it's taken from `delta.targetFileSize`.
partition_filters: the partition filters that will be used for partition overwrite. Only used in pyarrow engine.
large_dtypes: Only used for pyarrow engine
engine: writer engine to write the delta table. PyArrow engine is deprecated, and will be removed in v1.0.
Expand Down Expand Up @@ -308,6 +311,7 @@ def write_deltalake(
table=table._table if table is not None else None,
schema_mode=schema_mode,
predicate=predicate,
target_file_size=target_file_size,
name=name,
description=description,
configuration=configuration,
Expand Down
5 changes: 5 additions & 0 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1602,6 +1602,7 @@ fn write_to_deltalake(
schema_mode: Option<String>,
partition_by: Option<Vec<String>>,
predicate: Option<String>,
target_file_size: Option<usize>,
name: Option<String>,
description: Option<String>,
configuration: Option<HashMap<String, Option<String>>>,
Expand Down Expand Up @@ -1650,6 +1651,10 @@ fn write_to_deltalake(
builder = builder.with_replace_where(predicate);
};

if let Some(target_file_size) = target_file_size {
builder = builder.with_target_file_size(target_file_size)
};

if let Some(config) = configuration {
builder = builder.with_configuration(config);
};
Expand Down
Loading