Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Task Output as Extra for Dataset Trigger and DAG Run Conf #38432

Conversation

jscheffl
Copy link
Contributor

@jscheffl jscheffl commented Mar 23, 2024

This PR is a proposal to fix/resolve the request for feature as described in #37810 as a thin approach.

Idea for this solution:

  • Allow using task result as dynamic extra
    • If dataset defined extra_from_return=True use task result as extra
  • Pass the dict of extra to the data triggered DAG as configuration/parameter
    • If multiple events trigger a DAG, extra dictionaries are merged

This PR super-seeds PR #37888

closes: #37810

@jscheffl jscheffl requested a review from uranusjr March 23, 2024 22:31
@boring-cyborg boring-cyborg bot added area:Scheduler including HA (high availability) scheduler kind:documentation labels Mar 23, 2024
if TYPE_CHECKING:
assert self.task

for obj in self.task.outlets or []:
self.log.debug("outlet obj %s", obj)
# Lineage can have other types of objects besides datasets
if isinstance(obj, Dataset):
if obj.extra:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we merge the event static information with dynamic information?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No we should not :-(
I (previously) interpreted the extrafield as being dynamic data. But as it is (actually) intended to be meta data for the dataset itself I mis-interpreted this as an option to pass extra data for this "reference" to a Dataset (interpreted by the URI).

Comment on lines +2565 to +2566
if obj.extra:
extra = obj.extra
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not think this is right. It’s made quite clear in previous issues that Dataset.extras and DatasetEvent.extras are different things and should be kept separate.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, then it seems I mis-understood your comments in the previous PR7discussions about this. Thanks for clearly documenting the differences between Dataset and DatasetEvent(both) extra details in #38481. This opened my eyes and now I understand your push-back.
Now it is "clear" to me. Intent was not to overwrite or mangle "static" information for events.

if obj.extra:
extra = obj.extra
elif obj.extra_from_return:
extra = result if isinstance(result, dict) else {str(self.task_id): result}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Automatically putting the value under a key is too magical to me. I would prefer this either just forward the value (the extra field is capable of storing any JSON-able values, after all), or skip the value entirely if it has an unexpected type. The task ID is also not a particularly obvious key either.

Comment on lines +1285 to +1288
run_conf = {}
for event in dataset_events:
if event.extra:
run_conf.update(event.extra)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer we put this in a separate PR to discuss. It’s not entirely obvious how extras should be merged from different events that trigger the run.

@uranusjr
Copy link
Member

I also don’t quite feel comfortable the return value is still pushed to XCom if it is used as event extra. IMO it should go either one or the other, not both.

@@ -2455,7 +2456,7 @@ def _run_raw_task(

try:
if not mark_success:
self._execute_task_with_callbacks(context, test_mode, session=session)
result = self._execute_task_with_callbacks(context, test_mode, session=session)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. It seems that not all operators return a result on execution (example: DatabricksSubmitRunOperator)
  2. Some operators do return a result - like (GlueOperator, EmrServerlessStartJobOperator), but the return value is job_id in these scenarios.

Questions -

  1. If any of the above operators publish an Airflow dataset, how to specify extra dictionary in the corresponding dataset?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding 1+2: Yes, if no result is generated, the content will be just None. If it is a scalar like a job_id it is rather a string. So in such cases the output is not usable for passing along to the dataset event.

Regarding your question 1: The publish of the event actually is happening after this line of code. The change was attemping to capture the result to put is as extra a few lines of code later from here.

@uranusjr
Copy link
Member

I gave this a longer thought and I think I’m not particularly fond of the approach mainly because it mixes Dataset.extra and DatasetEvent.extra. As described in #37810, the current design is very explicit on the two being separate—Dataset.extra describes the thing that’s pointed to by the URI, while DatasetEvent.extra describes the data written into the thing represented by the URI. I quoted a few paragraphs from @blag in the issue. The design here directly passes through Dataset.extra to DatasetEvent.extra, which I believe would not make sense to most people. (For the record, I actually proposed copying Dataset.extra to DatasetEvent.extra initially, but @jedcunningham convinced me to not do it before I published #37810.)

The extra_from_return argument also has the same problem fundamentally—the flag controlling DatasetEvent behaviour should not be set on Dataset. It should be on the task instead (or on DatasetEvent itself, but that’s awkward because the event does not exist conceptually when we want to emit the extra).

@jscheffl
Copy link
Contributor Author

I gave this a longer thought and I think I’m not particularly fond of the approach mainly because it mixes Dataset.extra and DatasetEvent.extra. As described in #37810, the current design is very explicit on the two being separate—Dataset.extra describes the thing that’s pointed to by the URI, while DatasetEvent.extra describes the data written into the thing represented by the URI. I quoted a few paragraphs from @blag in the issue. The design here directly passes through Dataset.extra to DatasetEvent.extra, which I believe would not make sense to most people. (For the record, I actually proposed copying Dataset.extra to DatasetEvent.extra initially, but @jedcunningham convinced me to not do it before I published #37810.)

The extra_from_return argument also has the same problem fundamentally—the flag controlling DatasetEvent behaviour should not be set on Dataset. It should be on the task instead (or on DatasetEvent itself, but that’s awkward because the event does not exist conceptually when we want to emit the extra).

Thanks for sharing your concerns. After reading #38481 I better understand what you were meaning. I did not understand the difference between Dataset.extra (Which I always felt like "Dataset" being the holder for the URI as pointer towards a dataset) and the DatasetEvent.extra (which I did not notice before, expected this is the result of the outlet from execution not a definition made by the user). You ehancement in the docs in PR #38481 makes it now clear.

Therefore I'm now in favor of PR #38481 where you propose a simple but explicit possibility to attach extra per outlet w/o mixing with return or XCom.

Yes, and in-deed, the merging of multiple extras as DAG run conf is something separate - thought it is a 2-for-1 in this PR to publish the extra dynamically here as well as show how to easily consume it.

So I will close this "test baloon" PR here and "maybe" will open a separate PR for the topic of using extra as DAG run conf... if time.

@jscheffl jscheffl closed this Mar 27, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:Scheduler including HA (high availability) scheduler kind:documentation
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Annotate a Dataset Event in the Source Task
4 participants