Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

XComObjectStorageBackend returns the S3 path during deserialization instead of the data #39602

Closed
1 of 2 tasks
Uture opened this issue May 14, 2024 · 9 comments
Closed
1 of 2 tasks
Labels
affected_version:2.9 AIP-58 area:core kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet

Comments

@Uture
Copy link

Uture commented May 14, 2024

Apache Airflow version

2.9.1

If "Other Airflow 2 version" selected, which one?

No response

What happened?

After configuring the object storage as XCom backend, the serialization works fine above the specified threshold, but once another task consumes the previously stored XCom, the deserialization doesn't seem to work. Instead of the deserialized data, the path of the object is returned.

What you think should happen instead?

The stored object should be deserialized and returned to the downstream task.

How to reproduce

import pendulum
from airflow.decorators import dag, task


@dag(
    schedule_interval=None,
    catchup=False,
    start_date=pendulum.datetime(2024, 1, 1, tz="utc"),
)
def dag_test():

    @task()
    def producer():
        import random

        return [random.randint(0, 100) for _ in range(10_000)]

    @task()
    def consumer(obj):
        print(obj)

    producer_t = producer()
    producer_t >> consumer(producer_t)


dag_test()

Operating System

apache/airflow:2.9.0-python3.11

Versions of Apache Airflow Providers

No response

Deployment

Docker-Compose

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@Uture Uture added area:core kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet labels May 14, 2024
Copy link

boring-cyborg bot commented May 14, 2024

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

@kaxil
Copy link
Member

kaxil commented May 14, 2024

cc @bolkedebruin I haven't looked at the code, but was expecting that during deserialization this works out of the box.

https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/xcoms.html#object-storage-xcom-backend

@kaxil
Copy link
Member

kaxil commented May 14, 2024

cc @uranusjr

def deserialize_value(result: XCom) -> Any:
"""Deserializes the value from the database or object storage.
Compression is inferred from the file extension.
"""
data = BaseXCom.deserialize_value(result)
try:
path = XComObjectStorageBackend._get_full_path(data)
except (TypeError, ValueError): # Likely value stored directly in the database.
return data
try:
with path.open(mode="rb", compression="infer") as f:
return json.load(f, cls=XComDecoder)
except (TypeError, ValueError):
return data

@uranusjr
Copy link
Member

I seem to recall @TJaniF has a very similar issue a while ago. I don’t remember the details but it was some sort of configuration issues for that instance. And if it is indeed not a bug in Airflow logic, we should try to detect the configuration issue and raise it better to the user, instead of returning a wrong value.

@TJaniF
Copy link
Contributor

TJaniF commented May 14, 2024

Yes, I had the same issue and as far as I am aware this PR fixes it: #39313 so it should be fixed in 2.9.2 :)

@uranusjr
Copy link
Member

Good to hear Astronomer is now also in the time machine bussiness.

@bolkedebruin
Copy link
Contributor

Well thanks. That was quick :-).

@bolkedebruin
Copy link
Contributor

Note: @Uture I don't think the fix will do this for past xcom values. You will need to regenerate those.

@Uture
Copy link
Author

Uture commented May 14, 2024

Great, thank you all for resolving this so quickly.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
affected_version:2.9 AIP-58 area:core kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet
Projects
None yet
Development

No branches or pull requests

5 participants