Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

_internal.DeltaError when merging #2084

Closed
halvorlu opened this issue Jan 16, 2024 · 20 comments · Fixed by #2291
Closed

_internal.DeltaError when merging #2084

halvorlu opened this issue Jan 16, 2024 · 20 comments · Fixed by #2291
Labels
bug Something isn't working

Comments

@halvorlu
Copy link

Environment

Delta-rs version: 0.15.1

Binding: Python

Environment:

  • Cloud provider: Azure (deltatable stored in Azure)
  • OS: Linux/OpenShift

Bug

What happened:
I try to merge data into an existing deltatable like so:

deltatable.merge(
                mergeable_table,
                f"target._merge_key = source._merge_key",
                source_alias="source",
                target_alias="target",
            )
            .when_matched_update_all()
            .when_not_matched_insert_all()
            .execute()

This sometimes works, and sometimes fails with the following error:
_internal.DeltaError: Generic DeltaTable error: Schema error: No field named target._merge_key. Valid fields are _merge_key, <other camel-case columns>

What you expected to happen:
I expect the merge to succeed. The error claims that target._merge_key is not a valid field name, but _merge_key is, which is a bit strange.

How to reproduce it:
I have had trouble reproducing this locally and/or reliably.
The merge operations are run as part of a batch job which begins with

deltatable.optimize.compact()
deltatable.vacuum(dry_run=False, retention_hours=48, enforce_retention_duration=False)
deltatable.cleanup_metadata()

followed by a number of merge operations. If the first merge fails, all subsequent merge operations also fail. Sometimes the first merge succeeds, and then all subsequent merges also seem to succeed. So I wonder if any of these optimize/vacuum/cleanup methods could (sometimes) corrupt the table? (Maybe relevant: Are these operations run synchronously or asynchronously?)

@halvorlu halvorlu added the bug Something isn't working label Jan 16, 2024
@ion-elgreco
Copy link
Collaborator

@halvorlu do you have concurrent writers to the same table?

@halvorlu
Copy link
Author

There should not be concurrent writers, no.

@Blajda
Copy link
Collaborator

Blajda commented Jan 17, 2024

@halvorlu Is this table partitioned?
One hunch I have is that that merge is being executed when with zero records from the source.
Ideally the operation should be able to handle this case without issue.

Tested with an empty source with the correct schema and no issue.

Can you add additional logging of the schema for the source data?

@halvorlu
Copy link
Author

halvorlu commented Jan 18, 2024

I added this logging of field names:

        sorted_field_list = sorted([f.name for f in deltatable.schema().fields])
        logger.info(f"Sorted field list of existing schema: {sorted_field_list}")
        logger.info(f"Sorted field list of new data: {sorted(mergeable_table.schema.names)}")

And the field names are identical.

@halvorlu
Copy link
Author

Do you know where this error is thrown? I've tried searching in the code, but haven't found out where.
_internal.DeltaError: Generic DeltaTable error: Schema error: No field named target._merge_key. Valid fields are _merge_key, <other camel-case columns>

@halvorlu
Copy link
Author

And yes, the table is partitioned on two columns.

@ion-elgreco
Copy link
Collaborator

@halvorlu can you try making an minimal reproducible example with sample data?

@germanmunuera
Copy link

I have the same problem!
Multiple writes to a table partitioned by one column.
The first write is fine, subsequent writes throw DeltaError exception and the _delta_log directory contains several _commit_xxxxx.json.tmp files.
Data is saved to S3 but Delta Lake versions are not updated.

@halvorlu
Copy link
Author

@ion-elgreco I've tried to reproduce this reliably, but haven't really been able to...
@germanmunuera Are you using write or merge?

@germanmunuera
Copy link

I'm using merge

@germanmunuera
Copy link

germanmunuera commented Jan 26, 2024

Context:

  • Created DynamoDB table and IAM role
		aws dynamodb create-table \
			--table-name delta_rs_lock_table \
			--region eu-west-1 \
			--attribute-definitions \
				AttributeName=tablePath,AttributeType=S \
				AttributeName=fileName,AttributeType=S \
			--key-schema \
				AttributeName=tablePath,KeyType=HASH \
				AttributeName=fileName,KeyType=RANGE \
		    --provisioned-throughput \
		        ReadCapacityUnits=10,WriteCapacityUnits=10
		{
			"Version": "2012-10-17",
			"Statement": [
				{
					"Sid": "DeltalakeDynamoLock",
					"Effect": "Allow",
					"Action": [
						"dynamodb:GetItem",
						"dynamodb:PutItem",
						"dynamodb:DeleteItem",
						"dynamodb:Query",
						"dynamodb:UpdateItem"
					],
					"Resource": "arn:aws:dynamodb:eu-west-1:xxxxxxx:table/delta_rs_lock_table"
				}
			]
		}
  • Sets AWS_S3_LOCKING_PROVIDER = 'dynamodb' and DELTA_DYNAMO_TABLE_NAME = 'delta_rs_lock_table'
  • Executing several writes for different partitions (the tasks are orchestrated with Dagster). I'm using Polars but with DeltaTable you have the same results
	    def upsert(self, target: str, df: pl.DataFrame, partition_by: str | Sequence[str] | None = None) -> None:
	        if df.is_empty():
	            return

	        storage_options = self.storage_options.dict() if self.storage_options else {}
	        storage_options['AWS_S3_LOCKING_PROVIDER'] = 'dynamodb'
	        storage_options['DELTA_DYNAMO_TABLE_NAME'] = 'delta_rs_lock_table'

	        delta_schema: Any = _convert_pa_schema_to_delta(
	            df.to_arrow().schema, True)
	        partition_by = [] if partition_by is None else [partition_by] if isinstance(
	            partition_by, str) else partition_by
	        delta_write_options = {
	            'schema': delta_schema,
	            'partition_by': partition_by
	        }

	        try:
	            delta_merge_options = {
	                'predicate': 's.id = t.id',
	                'source_alias': 's',
	                'target_alias': 't'
	            }
	            (
	                df.write_delta(target, mode='merge', overwrite_schema=True, storage_options=storage_options,
	                               delta_merge_options=delta_merge_options, delta_write_options=delta_write_options)
	                .when_matched_update_all()
	                .when_not_matched_insert_all()
	                .execute()
	            )
	        except TableNotFoundError as err:
	            df.write_delta(target, mode='append', overwrite_schema=True, storage_options=storage_options,
	                           delta_write_options=delta_write_options)

  • The table exists in S3. The first process works, subsequent processes all failed

If you execute the tasks sequentially it works without problems

@germanmunuera
Copy link

Any news about this?

@ion-elgreco
Copy link
Collaborator

@rtyler do you have any thoughts here? Could it be related to something with the logstore implementation?

@ericvandever
Copy link

I hit this issue this week when doing merges from multiple concurrent writers with dynamodb locking provider enabled. From the analysis I did, the error seems to originate in the retry recovery after getting a TransactionError::VersionAlreadyExists error. That function tries to resolve the table merger predicate against the pyarrow schema from a default datafusion session. This will fail with a schema exception because the merge predicate ("target._merge_key = source._merge_key") is only valid against the joint schema of 'source' and 'target' as built in operations/merge build_join_schema.

It seems like merge execute needs to build up proper predicates for use in retry function based on all the inputs and add them to the DeltaOperation::Merge operation so that is available in commit_with_retries. Due to the complexity of that merge code, I ended up just adding user-level retry logic around the whole merge.execute() call and will pay the extra expense of writing data again when this occurs (even if they are separate partitions and should succeed)

aside - one of my colleagues currently runs into another error "_internal.DeltaError: Invalid table version: 202" originating from that same retry block when calling write_deltalake mode="overwrite" from 5 concurrent writers (in a test). I can't recreate, but have significantly more latency to our S3 region. I haven't investigated this one as deeply, but seems like another edge case to be aware of within the retry logic.

@ericvandever
Copy link

ericvandever commented Feb 2, 2024

There should not be concurrent writers, no.

I have also seen this issue pop up when executing 2 merges against the same table sequentially. We have a case where we delete based on a key and then insert some data in subsequent merge statements.

@franz101
Copy link
Contributor

franz101 commented Mar 3, 2024

@halvorlu do you have concurrent writers to the same table?

I have concurrent writes and use DynamoDB, I think the error should be more clear in case that happens.

@JonasDev1
Copy link
Contributor

It seems that the rust merge commitInfo implemantation is different to the Spark/Databricks implementation.
Databricks:
"operationParameters":{ "predicate":"[\"((eventId#33510940 = eventId#83944) AND (country#33510941 = country#83945))\"]", "matchedPredicates":"[{\"predicate\":\"(meta#33510942[event_time] < meta#83982[event_time])\",\"actionType\":\"update\"}]", "notMatchedPredicates":"[{\"actionType\":\"insert\"}]", "notMatchedBySourcePredicates":"[]" },
(I don't know what this eventId#33510940 means, but I think its a ref to a parquet file?)

Delta-rs:
"operationParameters":{ "matchedPredicates":"[{\"actionType\":\"update\",\"predicate\":\"source.meta_event_name != 'REMOVE'\"},{\"actionType\":\"delete\",\"predicate\":\"source.meta_event_name = 'REMOVE'\"}]", "predicate":"source.eventId = target.eventId", "notMatchedBySourcePredicates":"[]", "notMatchedPredicates":"[{\"actionType\":\"insert\",\"predicate\":\"source.meta_event_name != 'REMOVE'\"}]" },

If we want to solve this problem, we have to change the predicate structure, the easiest way would be to remove the alias, but it's still different to the Spark implementation. In delta-rs, I believe this predicate is currently only used for the merge operation

@ion-elgreco
Copy link
Collaborator

@JonasDev1 what's the issue here?

The commitInfo is basically free format and not a requirement of the protocol.

@JonasDev1
Copy link
Contributor

JonasDev1 commented Mar 15, 2024

The issue is that the conflict checker doesn't recognise alias names such as source and target in operationParameters.predicate and think that it's a struct field.

Possible solution:
Remove the alias from the predicate source.eventId = target.eventId --> eventId = eventId and check if the conflict checker will accept this. Because it useless to keep the alias for conflict checks, as the conflict checker has no mapping to the original name anyway.

If the conflict checker has issues with eventId = eventId, we also need to remove predicates without static values(static is for example: country = 'us') .

But as you mentioned, the commitInfo is free and we can keep the source.eventId = target.eventId in separate field mergePredicate.

I will try this out in the next days.

ion-elgreco pushed a commit that referenced this issue Mar 19, 2024
# Description
This merge request will change the
commitInfo.operationParameters.predicate of the merge operation. This is
required to allow conflict checking on concurrent commits. Before, the
predicate was just the simple merge predicate like `source.eventId =
target.eventId`, but the conflict checker doesn't know these aliases and
doesn't have access to the source df.
So I now use the early_filter, which is also used before to filter a
subset of the target table. Basically, the early_filter only contains
static filters and partition filters that are converted to fixed values
based on the source data. The early_filter can be None if no
column/partition pre-filtering is possible, or if the merge contains a
not_match_source operation. (See the generalize_filter function in the
file).
The commitInfo predicate uses exactly this filter, except that the
target alias is removed.

The predicate is used by the conflict checker, for example when there
are multiple concurrent merges. If there is a predicate, the conflict
checker will check whether the concurrent commit wrote or deleted data
within that predicate. If the predicate is None, the conflict checker
will treat the commit as a `read_whole_table' and interpret any
concurrent updates as a conflict.

Example:
Target table with partition country
Merge with predicate `source.id = target.id AND target.country='DE'` ->
CommitInfo predicate `country='DE'`
Merge with predicate `source.id = target.id AND
target.country=source.country` -> CommitInfo predicate `country='DE' OR
country='US'`
Merge with predicate `source.id = target.id` -> CommitInfo predicate
None (As full target table join is required)


# Related Issue(s)
- closes #2084
- closes #2227

---------

Co-authored-by: Jonas Schmitz <[email protected]>
Co-authored-by: David Blajda <[email protected]>
@JonasDev1
Copy link
Contributor

This issue is now fixed, but there is another issue after the commit.
The merge can't return the new table because the snapshot is not updated.
If you don't need the table afterwards, you can hande a DeltaTableError::Generic("Version mismatch") error as success result.

#2279

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

7 participants