-
Notifications
You must be signed in to change notification settings - Fork 406
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Schema evolution mergeSchema
support
#1386
Comments
I am looking for exact same functionality as well in delta-rs! From current doc, it seems impossible to add/drop columns on a delta table without impacting existing data. Hope it has good suggestion or solution in delta-rs. |
I'm currently investigating switching from my Databricks job to something more lightweight - this seems like a rather big roadblock when one of the core functionality of delta tables is schema evolution. Although we are doing it with spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true") Which of course is not applicable here :) |
+1 on this. I found a way to hack around this limitation by running some code like this that appends a new let delta_fields = vec![...my expected fields...]
let mut delta_schema = table.get_schema().expect("No schema").clone();
// HACK: Enable schema evolution by adding a MetaData to the delta log
if delta_schema.get_fields().len() < delta_fields.len() || delta_schema.get_fields().iter().enumerate().any(|(i, f)| *f != delta_fields[i]) {
println!("New columns added");
let metadata = DeltaTableMetaData::new(
None,
None,
None,
SchemaTypeStruct::new(delta_fields),
vec!["date".to_string()],
HashMap::new(),
);
let meta = action::MetaData::try_from(metadata)?;
let actions = vec![Action::metaData(meta)];
let storage = table.object_store();
commit(storage.as_ref(), &actions, &table.state).await?;
table.load().await?;
delta_schema = table.get_schema().expect("No schema").clone();
}
/** Crate private methods copied from delta rust/src/operations/transaction/mod.rs */
fn get_commit_bytes(
actions: &Vec<Action>,
) -> Result<bytes::Bytes, TransactionError> {
let mut extra_info = serde_json::Map::<String, Value>::new();
let mut commit_info: CommitInfo = Default::default();
commit_info.timestamp = Some(Utc::now().timestamp_millis());
extra_info.insert(
"clientVersion".to_string(),
Value::String(format!("delta-rs.{}", crate_version())),
);
commit_info.info = extra_info;
Ok(bytes::Bytes::from(log_entry_from_actions(
actions
.iter()
.chain(std::iter::once(&Action::commitInfo(commit_info))),
)?))
}
fn log_entry_from_actions<'a>(
actions: impl IntoIterator<Item = &'a Action>,
) -> Result<String, TransactionError> {
let mut jsons = Vec::<String>::new();
for action in actions {
let json = serde_json::to_string(action)
.map_err(|e| TransactionError::SerializeLogJson { json_err: e })?;
jsons.push(json);
}
Ok(jsons.join("\n"))
}
const DELTA_LOG_FOLDER: &str = "_delta_log";
/// Low-level transaction API. Creates a temporary commit file. Once created,
/// the transaction object could be dropped and the actual commit could be executed
/// with `DeltaTable.try_commit_transaction`.
pub(crate) async fn prepare_commit<'a>(
storage: &dyn ObjectStore,
actions: &Vec<Action>,
) -> Result<Path, TransactionError> {
// Serialize all actions that are part of this log entry.
let log_entry = get_commit_bytes(actions)?;
// Write delta log entry as temporary file to storage. For the actual commit,
// the temporary file is moved (atomic rename) to the delta log folder within `commit` function.
let token = uuid::Uuid::new_v4().to_string();
let file_name = format!("_commit_{token}.json.tmp");
let path = Path::from_iter([DELTA_LOG_FOLDER, &file_name]);
storage.put(&path, log_entry).await?;
Ok(path)
}
/// Commit a transaction, with up to 5 retries. This is low-level transaction API.
///
/// Will error early if the a concurrent transaction has already been committed
/// and conflicts with this transaction.
pub async fn commit(
storage: &dyn ObjectStore,
actions: &Vec<Action>,
read_snapshot: &DeltaTableState,
) -> Result<i64> {
let tmp_commit = prepare_commit(storage, actions).await?;
let version = read_snapshot.version() + 1;
try_commit_transaction(storage, &tmp_commit, version).await.map_err(|e| e.into())
}
fn commit_uri_from_version(version: i64) -> Path {
let version = format!("{version:020}.json");
Path::from(DELTA_LOG_FOLDER).child(version.as_str())
}
/// Tries to commit a prepared commit file. Returns [DeltaTableError::VersionAlreadyExists]
/// if the given `version` already exists. The caller should handle the retry logic itself.
/// This is low-level transaction API. If user does not want to maintain the commit loop then
/// the `DeltaTransaction.commit` is desired to be used as it handles `try_commit_transaction`
/// with retry logic.
async fn try_commit_transaction(
storage: &dyn ObjectStore,
tmp_commit: &Path,
version: i64,
) -> Result<i64> {
// move temporary commit file to delta log directory
// rely on storage to fail if the file already exists -
storage
.rename_if_not_exists(tmp_commit, &commit_uri_from_version(version))
.await
.map_err(|err| anyhow!("Failed to commit {:?}", err))?;
Ok(version)
} |
Is there any news on this topic or on the fork of this improvement? |
Hi, this will be great to have, We have been discussing in Polars ##11983 that having merge out of the box in write_delta() will be awesome now that the release v.0.12 supports MERGE. If the schema evolution is going to be added in TableMerger() this integration would be bit more straightforward. BR |
Hi, We desire this as well. Would be great to have this. |
Hello, I would be grateful if option to change schema when merging was implemented. |
Are there any updates on this ? Is this being worked on by anyone ? |
@BarakStout no one is working on this afaik |
This commit introduces the `WriteMode` enum and the ability to specify writes which should enable [schema evolution](https://delta.io/blog/2023-02-08-delta-lake-schema-evolution/). The result of this is a new `metaData` action added to the transaction log with the write which reflects the updated schema There are some caveats however such as all writes must include non-nullable columns. Fixes delta-io#1386 Sponsored-by: Raft, LLC.
This commit introduces the `WriteMode` enum and the ability to specify writes which should enable [schema evolution](https://delta.io/blog/2023-02-08-delta-lake-schema-evolution/). The result of this is a new `metaData` action added to the transaction log with the write which reflects the updated schema There are some caveats however such as all writes must include non-nullable columns. Fixes delta-io#1386 Sponsored-by: Raft, LLC.
This commit introduces the `WriteMode` enum and the ability to specify writes which should enable [schema evolution](https://delta.io/blog/2023-02-08-delta-lake-schema-evolution/). The result of this is a new `metaData` action added to the transaction log with the write which reflects the updated schema There are some caveats however such as all writes must include non-nullable columns. This change does not modify the Write operation which has a datafusion dependency. Unfortunately we have some redundancy in API surface insofar that the writer in src/operations/ just performs parquet writes. The Write operation however requires datafusion and wiull actually effect transaction log writes. Fixes delta-io#1386 Sponsored-by: Raft, LLC.
Hi, due to this missing feature we are not able to leverage delta-rs. |
@srsharathreddy-92 this is currently being worked on by @rtyler :) |
This commit introduces the `WriteMode` enum and the ability to specify writes which should enable [schema evolution](https://delta.io/blog/2023-02-08-delta-lake-schema-evolution/). The result of this is a new `metaData` action added to the transaction log with the write which reflects the updated schema There are some caveats however such as all writes must include non-nullable columns. This change does not modify the Write operation which has a datafusion dependency. Unfortunately we have some redundancy in API surface insofar that the writer in src/operations/ just performs parquet writes. The Write operation however requires datafusion and wiull actually effect transaction log writes. Fixes delta-io#1386 Sponsored-by: Raft, LLC.
This commit introduces the `WriteMode` enum and the ability to specify writes which should enable [schema evolution](https://delta.io/blog/2023-02-08-delta-lake-schema-evolution/). The result of this is a new `metaData` action added to the transaction log with the write which reflects the updated schema There are some caveats however such as all writes must include non-nullable columns. This change does not modify the Write operation which has a datafusion dependency. Unfortunately we have some redundancy in API surface insofar that the writer in src/operations/ just performs parquet writes. The Write operation however requires datafusion and wiull actually effect transaction log writes. Fixes #1386 Sponsored-by: Raft, LLC.
I am keeping this open, until it's also part of the operations API writer because that's the one that's also used by the Python bindings |
Hi, is someone working on adding to python Bindings? I am excited to start utilizing this feature :) |
I'm not too deep in the code, but I guess thats not too hard to do. If you give me some hints I can give it a try. This unblocks very interesting use cases for us |
@aersam the schema evolution logic got added into the RecordBatchWriter, which sits here: crates/core/src/writer/record_batch.rs However we want this logic to be added as well here: So you need to check how much of the schema evolution can be reused and copied/modified into the write operation. You should likely follow a similar pattern of allowing the mergeSchema paramater to be set, merge the table schema and arrow recordbatch schema, and then create columns with null values for the ones that in the SET difference of the mergedschema. Hope that helps for you to get started :) |
It can only be exposed to python when it's added into the write operation, which currently is not the case yet. |
I'm trying to do that, but a bit simplier. We know the schema of the given data anyway, so I can do it upfront, I guess? Also I try to preserve partitioning columns, I'd expect the new schema to contain the same partition columns. I should probably test that assumption or even just take the partitioning columns given. About the null columns: I don't even think we need them, according to Spec: My PR is in a very early stage, no testing done yet, not even manually. However if you want to have a look, preventing me to do something completely wrong, feel free to review |
…ust engine) This replaces the old "overwrite_schema" parameter with a schema_write_mode parameter that basically allows to distinguish between overwrite/merge/none Fixes delta-io#1386
…ust engine) This replaces the old "overwrite_schema" parameter with a schema_write_mode parameter that basically allows to distinguish between overwrite/merge/none Fixes delta-io#1386
…ust engine) This replaces the old "overwrite_schema" parameter with a schema_write_mode parameter that basically allows to distinguish between overwrite/merge/none Fixes delta-io#1386
…ust engine) This replaces the old "overwrite_schema" parameter with a schema_write_mode parameter that basically allows to distinguish between overwrite/merge/none Fixes #1386
Hi, wanted to follow up on this thread - given that there is support to merge schema now via I tried doing:
where Are there any workarounds to achieve this desired behavior? |
PR #2289 would fix the error, I hope I can finish it soon |
@aersam maybe we should hold off with that one before all timeout issues are addressed |
Oh, there are timeout issues? 🙂 which ones are you talking about? |
Ok, would be interesting if those are even resolved by the PR as writing starts much earlier and way less RAM is required |
@aersam maybe yeah |
Description
Originally asked at Stack Overflow for how to add a new column when writing to a Delta table.
Use Case
One of my Delta table has 4 columns. Now the new data coming has one more new column, so total 5 columns.
I am hoping to use
write_deltalake
to write the new data inmode="append"
to same Delta table.In Spark, it is possible to write new data with more columns by using
mergeSchema
option based on this doc.It would be great to support schema evolution
mergeSchema
option forwrite_deltalake
. Thanks! 😃The text was updated successfully, but these errors were encountered: