Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Annotate a Dataset Event in the Source Task #37810

Closed
2 tasks done
uranusjr opened this issue Mar 1, 2024 · 10 comments · Fixed by #38650
Closed
2 tasks done

Annotate a Dataset Event in the Source Task #37810

uranusjr opened this issue Mar 1, 2024 · 10 comments · Fixed by #38650
Labels
kind:feature Feature Requests

Comments

@uranusjr
Copy link
Member

uranusjr commented Mar 1, 2024

Description

To eventually support the construct and UI we’re aiming for in assets, we need to attach metadata to the actual data, not the task that produces it, nor the location it is written to.

In the task-based web UI, we can show those attached metadata in the task that emits the dataset event, to give an impression the metadata is directly associated to the task. In the implementation, however, the metadata would only be associated to the dataset, and only indirectly related to the task by the fact that the task emits the event.

Use case/motivation

An Airflow task generating data may want to attach information about it. Airflow does not currently provide a good interface for this. The only thing that resembles such feature is to attach and extra dict on Dataset like this:

@task(outlets=[Dataset("s3://bucket/key", extra={"xx": "yy"})])
def producer():
    # Write to the file on S3...

This is however quite limiting. It may be good enough for static information such as who owns this data, but not for information that is only known at runtime, to provide additional context to the generated data.

Store runtime-populated extras on DatasetEvent

When a Dataset event is emitted, the corresponding DatasetEvent model in the database already has a field call extra. This is however currently not populated when the event is generated from a task outlet (only when it’s created via the REST API).

A previous design discussion contains the following comment from @blag:

The only intent that I ever had (and note: I was not the author of the dataset AIP) regarding the extra fields for datasets and dataset events, was to allow third party integrations to easily store information from external systems that wasn't captured in Airflow's database schema, eg: to do so without forking the schema migrations.

and

But if all you are looking to do is pass information between task instances in the same DAG run or between task instances in different DAG runs, I believe the Airflow mechanism to do this is XComs, even with data-aware Airflow.

However, I would argue that user code in an Airflow DAG should also have the ability to store custom information. While the information is readable in downstream tasks—thus technically is a mechanism to pass data between tasks—the main intention behind the design is instead to annotate the generated data, and does not go against the original design.

Provide extras at runtime

The task function (either @task-decorated, or a classic operator’s execute) will be able to attach values to a Dataset URI in the function. This is done by an accessor proxy under the key dataset_events, so in the task function you can:

@task(outlets=[Dataset("s3://bucket/key")])
def producer(*, dataset_events: dict[str, DatasetEventProxy]):
    dataset_events["s3://bucket/key"].extra["desc"] = "foo bar"

After the task function’s execution, the extras provided dynamically are written to the DatasetEvent entry generated for the Dataset. Do note specially this is entirely distinct from extra on Dataset.

Instead of using URI, you can use the dataset object directly to access the proxy:

target = Dataset("s3://bucket/key")

@task(outlets=[target])
def producer(*, dataset_events: dict[str, DatasetEventProxy]):
    dataset_events[target].extra["desc"] = "foo bar"

Example using the context dict instead:

@task(outlets=[target])
def producer():
    context = get_current_context()
    context["dataset_events"][target].extra["desc"] = "foo bar"

With a classic operator:

class MyOperator(BaseOperator):
    def execute(self, context: Context) -> None:
        super().execute(context)
        context["dataset_events"][target].extra["desc"] = "foo bar"

Show dataset event extras in web UI

Both dataset and dataset event extras currently have zero visibility to users in the web UI. This is somewhat acceptable for datasets, where the extra dict is static, but is a problem for dynamically generated values. Additional components should be added to the web UI to display extras emitted by a dataset event.

An obvious first addition would be to add a tables in the task instance panel in the Grid view when the task instance emits dataset events with extras. Quick UI mock:

dataset-extra-view-mock

Each key and value will simply be stringified to be displayed in the table. This should be enough for simple data since the extra dict currently need to be JSON-compatible. We can discuss richer data (similar to how Jupyter displays a DataFrame), and putting this information in other places (e.g. in the Dataset view) in the future.

Related issues

#35297
#36075

Are you willing to submit a PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@uranusjr uranusjr added kind:feature Feature Requests needs-triage label for new issues that we didn't triage yet labels Mar 1, 2024
@jedcunningham
Copy link
Member

LGTM, looking forward to this 👍

@jedcunningham jedcunningham removed the needs-triage label for new issues that we didn't triage yet label Mar 1, 2024
@jscheffl
Copy link
Contributor

jscheffl commented Mar 4, 2024

Hi @uranusjr I was thinking of the same/similar feature like many many weeks - especially in data driven use cases. We also have a DAG that potentially generates dataset events - but in our use case we need to add a context - imagine like the file name in the S3 dataset or a UUID. And as this context is needed and is dynamic, would be a waste to crrate 1000's of datasets for 1000's events on specific files in the S3.

I like the idea to attach extra to the events - but was thinking a bit more pragmatic towards what we already have. Did you consider:

  • Use the resulting XCom (==return value of the task, assuming xcom_push=True) being used as the context information? Then we don't need another mechanism as DatasetEventProxy object to pass this information.
  • Maybe still use the extra field store the task output/Xcom there for traceability/persistence and use this as DAG trigger params for the DAG being triggered from the Dataset event - then we also do not need a new mechanism to receive event data.

Pro would be that existing params JSON schema validation and default could be used, you can call a dataset triggered DAG also manually for testing.
Con would be that a task might raise an event with non-compliant data/not matching JSON schema. Where to report the error if the resulting call structure does not validate positive against the triggered DAG run params JSON schema. Report the DAG run failed or the emitting task being failed? But this might be a very small side effect to discuss.

@uranusjr
Copy link
Member Author

uranusjr commented Mar 4, 2024

I like the idea. How would this work if the task writes to more than one dataset though?

Another thing I’ve been thinking is to give XCom a dataset URI so we can track lineage of its values (also tieing back to the read/write to XCom via Object Store idea). This raises a question, what should we do if we want to use XCom for both the “actual” data, if it is already used for extra?

Eventually what I think we should do is to provide some sort of “output management” mechanism that generalises XCom—if XCom is a kind of dataset, its metadata is conceptually just automatically populated dataset metadata. So the return value should still be the actual data we want to write (with where and how the data is stored being customisable), and downstream tasks depend on, and metadata should be provided by another way. I’m not entire sure how the end result should look like, or how to smoothly transition toward it.

@jscheffl
Copy link
Contributor

jscheffl commented Mar 5, 2024

I like the idea. How would this work if the task writes to more than one dataset though?

I believe might be an option as extension to also be able to pick which XCom as alternative output from the task is used to fill the extra. Might be another increment or if there is a concrete demand it could be made also right here.

Another thing I’ve been thinking is to give XCom a dataset URI so we can track lineage of its values (also tieing back to the read/write to XCom via Object Store idea). This raises a question, what should we do if we want to use XCom for both the “actual” data, if it is already used for extra?

I understand the idea of XCom with dataset URI. But would this URI refer to a specific DAG run or be the abstract "last" run? One would be a "moving target" and the other would be "a dataset URI per run"==many many URIs to track... or I mis-understand. Can you give an example?

Eventually what I think we should do is to provide some sort of “output management” mechanism that generalises XCom—if XCom is a kind of dataset, its metadata is conceptually just automatically populated dataset metadata. So the return value should still be the actual data we want to write (with where and how the data is stored being customisable), and downstream tasks depend on, and metadata should be provided by another way. I’m not entire sure how the end result should look like, or how to smoothly transition toward it.

When you ask this question I understand this would add a new complex area of XCom management and data flow. At the moment Xcom is quite simple to be used as Key/Value pair to pass data. It is not conforming to a schema (e.g. JSON validation/pydantic model) and can be any type.
I don't see XCom as being a dataset per-se, it is just a data fragment passed as output for some other input. Within a DAG between tasks. The code idea is if it can be used between tasks, why not use the same facility between DAGs if data triggered?
Still schema checks and other features for XCom can be added - independent of the Dataset trigger mechanism also for regular use in DAGs between tasks? I see it independent of this concept.
Also the DAG author would have the full freedom (if there is a need to manage data structure or so) to have a PythonTask to proxy XCom information or re-structure it before the dataset is triggered. I believe there is no critical need for complexity that another python task in the workflow can also fix.

@uranusjr
Copy link
Member Author

uranusjr commented Mar 7, 2024

Since XCom is just a data storage, it can be used like an external S3 file, or a database the user sets up. It is just a bit more automated and contains some metadata. I feel it is reasonable to assign a dataset URI to each key-value pair, so a dataset event is triggered when a key-value pair is written. This makes cross-DAG XCom usage more useful IMO since it allows a downstream DAG to declare dependency at the DAG level (via dataset) to the upstream.

With that established, if we store extra metadata (of a dataset), it only makes sense to allow extra metadata also when an XCom is written. But if we use XCom for the extra, writing to XCom would write… extra metadata to XCom? And does that extra metadata also has a dataset URI and can have extra extra metadata? It becomes awkward.

@jscheffl
Copy link
Contributor

jscheffl commented Mar 7, 2024

With that established, if we store extra metadata (of a dataset), it only makes sense to allow extra metadata also when an XCom is written. But if we use XCom for the extra, writing to XCom would write… extra metadata to XCom? And does that extra metadata also has a dataset URI and can have extra extra metadata? It becomes awkward.

I would see it as "we use existing XCom meta data" but not add new one. The data is just copied to the next DAGrun Conf on top.

What therefore comes into my mind: We could also add a flag to the outlet parameter - as extra is optional and if specified is static, we could set another marker if task return value (==XCom) shall be taken over as extraevent data. Otherwise just no data is taken over as default.

@uranusjr
Copy link
Member Author

if task return value (==XCom) shall be taken over as extra event data.

So if the marker is set, the return value goes to the dataset event’s extra, instead of (not in addition to) the xcom table (XCom model)?

I think what makes me feel uncomfortable about using XCom is that the model doesn’t contain a special semantic to data stored in it. It is more likely at least some people use it as a generic storage for data, instead of metadata (of the data). This means we can’t have a guaranteed way to tell if a value in there is supposed to be metadata (that’s associated to another data), or random data. But if metadata does not go into the table (but somewhere else) instead), I think that’s fine.

Anotherway to do this would be to introduce a special type to return from a task function, like

from airflow.datasets import Dataset, Metadata
from airflow.decorators import task

@task(outlets=[Dataset("s3://my/data.json")])
def my_task():
    with ObjectStoragePath("s3://my/data.json").open("w") as f:
        ... # Write to file...
    return Metadata(uri="s3://my/data.json", extra={"extra": "metadata"})

This is maybe more visible than setting a flag

#                                           easier to miss?
@task(outlets=[Dataset("s3://my/data.json", event_extra_source="xcom")])
def my_task():
    with ObjectStoragePath("s3://my/data.json").open("w") as f:
        ... # Write to file...
    # Need to double check above to understand what this return implies.
    return {"extra": "metadata"}

@uranusjr uranusjr added this to the Airflow 2.10.0 milestone Mar 20, 2024
@uranusjr
Copy link
Member Author

I gave this a pretty long thought. I am leaning to implementing the return Metadata(...) syntax mentioned above, but with a little flair to solve the issue it conflicts with XCom by allowing yield as well:

@task(outlets=[Dataset("s3://my/data.json")])
def my_task():
    with ObjectStoragePath("s3://my/data.json").open("w") as f:
        ... # Write to file...
    yield Metadata(uri="s3://my/data.json", extra={"extra": "metadata"})
    return data  # This goes to XCom!

The thing I particualrly like about this is that in the future, when XCom gets its own lineage information and can also take additional metadata, we can also introduce another special type to allow passing int data and metadata at the same time:

@task(outlets=[Dataset("s3://my/data.json")])
def my_task():
    with ObjectStoragePath("s3://my/data.json").open("w") as f:
        ... # Write to file...
    return Output(data, extra={"extra": "metadata"})

This also opens the door for sending multiple things from one single function if we allow yield Output(...). I can think of future extensions that the return value does not go to the XCom storage, but whatever is specified in outlets directly (without needing to explicitly write data in the function). There are a lot of opportunities.

That said, I think implementing the context-based approach is still a good first step toward all this. Even with the more magical and convenient return-as-metadata syntax, using a context variable is still explicit and may be preferred by some. It is also easier to implement, and should be a good way to start things rolling without getting into a ton of syntax design but focus on the core feature here. So I’m going to start with that first.

@jscheffl
Copy link
Contributor

This also opens the door for sending multiple things from one single function if we allow yield Output(...). I can think of future extensions that the return value does not go to the XCom storage, but whatever is specified in outlets directly (without needing to explicitly write data in the function). There are a lot of opportunities.

Like to have this option. Also thought of this. Had the idea of "future extension" as well with the primary intend to keep it simple first :-D

That said, I think implementing the context-based approach is still a good first step toward all this. Even with the more magical and convenient return-as-metadata syntax, using a context variable is still explicit and may be preferred by some. It is also easier to implement, and should be a good way to start things rolling without getting into a ton of syntax design but focus on the core feature here. So I’m going to start with that first.

Looking forward to a PR.

What do you think if maybe your any my proposal are both possible, mostly based on how the used needs it? In many cases the extra information might not be used at-all or with the existing mechanisms is totally fine. Something like:

@task(outlets=[Dataset("s3://my/data.json", extra_from_return=True)])

or

@task(outlets=[Dataset("s3://my/data.json", extra_from_xcom="dataset_key")])

..whereas in your notation with the yield you explicitly push the dataset, I assume before task completion already. Instead of uri might be better to define the target Dataset directly?

@uranusjr
Copy link
Member Author

uranusjr commented Apr 1, 2024

Core mechanism to set DatasetEvent.extra is implemented in #38481. I’ll move to implementing yield Metadata(...) from a task next. This might take a while since I can see how Mypy would be unhappy on execute can be either a plain function or a generator function…

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment