Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add dataset update endpoint #29433

Conversation

michaelmicheal
Copy link
Contributor

@michaelmicheal michaelmicheal commented Feb 8, 2023

Closes: #29162
To support integration with external services that logically update real-world datasets, this PR

  • makes task_instance optional for registering a dataset
  • adds a post endpoint to create dataset events through the API

^ Add meaningful description above

Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@boring-cyborg boring-cyborg bot added the area:API Airflow's REST/HTTP API label Feb 8, 2023
@michaelmicheal michaelmicheal marked this pull request as ready for review February 8, 2023 19:51
Copy link
Contributor

@ephraimbuddy ephraimbuddy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure of this, like the broadcasted event has no source dag/task etc.
cc: @dstandish

airflow/api_connexion/openapi/v1.yaml Outdated Show resolved Hide resolved
airflow/api_connexion/openapi/v1.yaml Outdated Show resolved Hide resolved
airflow/api_connexion/endpoints/dataset_endpoint.py Outdated Show resolved Hide resolved
@michaelmicheal
Copy link
Contributor Author

I'm not sure of this, like the broadcasted event has no source dag/task etc. cc: @dstandish

There's a few reasons why I think it's super important to at least support (not necessarily encourage) external dataset changes.

  1. Integrate with external services and non-Airflow components of a pipeline. If a data science team has an external component of an ETL pipeline (for example data warehouse ingestion), these external services should be able to trigger workflows that depend on datasets when updated externally.
  2. Support multi-instance Airlfow architectures. With astro, cloud composer, and custom solutions (like us at Shopify), using multiple Airflow instance in production is very common. When one layer of the data platform is orchestrated in one instance, and another layer is orchestrated in a different instance, we rely on being able to broadcast dataset changes between Airflow instances. We need this integration to be able to pass dataset changes between Airflow instances through the API.

airflow/api_connexion/endpoints/dataset_endpoint.py Outdated Show resolved Hide resolved
airflow/datasets/manager.py Outdated Show resolved Hide resolved
else:
# When an external dataset change is made through the API, it isn't triggered by a task instance,
# so we create a DatasetEvent without the task and dag data.
dataset_event = DatasetEvent(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be great to have extra information available when the dataset has externally changed such as:

  • by whom - external_auth_id or external_service_id -> required
  • from where (api, client_ip / remote_addr) - external_source -> required
  • the timestamp of the actual event - so it can be reconciled if required -> Nullable as it might not be available

This ensures lineage isn't broken across systems

Copy link
Contributor Author

@michaelmicheal michaelmicheal Feb 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think of the latest changes @bolkedebruin?

airflow/models/dataset.py Outdated Show resolved Hide resolved
@bolkedebruin
Copy link
Contributor

So, I like where this is going, but I'd like some extra robustness / proper security (see above). Furthermore, we need to to think how this API will be used.

For example, I expect the majority of usage to come from cloud storage integration. S3 (+Minio), GCS, ABS all use their own callback schema, which we ideally allow providers to register these kind of callbacks. The question becomes how to 'detect' with what service we are integrating without creating a lot of work for ourselves by needing to expose every flavor of callback as a separate API. I quite understand that this is beyond the scope of your PR, but it gives a dot on the horizon so to say.

I think with the security concerns addressed and unit tests added it looks mergeable. I'm a bit concerned around the schema and schema evolution. How's that going to work?

@dimberman
Copy link
Contributor

LGTM once @bolkedebruin 's comments are addressed

@michaelmicheal michaelmicheal requested review from bolkedebruin and removed request for bolkedebruin March 9, 2023 14:17
@uranusjr uranusjr dismissed their stale review March 13, 2023 08:26

Not ready

@ashb ashb modified the milestones: Airflow 2.6.1, Airflow 2.7.0 May 6, 2023
@DjVinnii
Copy link
Contributor

This feature looks like something we can use. I had a quick look and one thing came to mind:

It looks like it is not possible to create a remote dataset event if the dataset does not exist already. Is that correct? I my opinion, it would then also be nice if we can create a dataset by using the API.

from sqlalchemy import func
from sqlalchemy.orm import Session, joinedload, subqueryload

from airflow import Dataset
Copy link
Member

@uranusjr uranusjr May 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should import from airflow.datasets instead.

raise BadRequest(detail=str(err))
uri = json_body["dataset_uri"]
external_source = request.remote_addr
user_id = getattr(current_user, "id", None)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of ID, I feel this should use the username. The ID from database should be considered kind of an implementation detail.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't the id how a typical FK->PK relationship is defined?
it seems appropriate to add user_id and make a FK to users table. then one could always look up the username by joining?

Copy link
Member

@uranusjr uranusjr May 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah good point, this should probably just be a fk.


Edit: Using an fk has problems when a user is deleted though. We probably don’t want to lose the triggering history in the case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

true

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think there's a mechanism to set to null when user deleted?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Setting this to a (non-fk) username would be much more useful than having a null IMO.

Copy link
Contributor

@dstandish dstandish May 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't stand in way here, don't really mind, but if you'll humor me I'll think it through out loud...

Maybe i'm just hung up on the standard practice of using surrogate keys, normalization, etc but...

So, airflow provides a mechanism to deactivate users. Interestingly, on the user edit form, it even says don't delete users; it's a bad practice; just deactivate them.

image

Additionally a username can be changed. So I could take some action, change my username, and now you don't know that I took that action.

Additionally, you could delete a user, have a new user added with same username, but it'd be a different "user".

I think your point that having a username is more useful than having a null is obviously true. But I guess my thought is that, it's not a situation that should happen, because users should not be deleted (because of course by keeping them we don't run into these problems), and if an airflow cluster admin does that well, that's up to them. But presuming they don't you have the benefits of referential integrity.

Interestingly the log table does not have a user_id column, which is a bit weird... probably should... but then there too i'd say it would make sense to record the db ID.

Another option would be to reject the deletion of a user when there are associated log or dataset events. That would seem reasonable too.

So yeah I think i've convinced myself a bit more that using the ID is the right way. I think that the mutability of username is a strong argument in light of security / auditing concerns. But lmkwyt

Copy link
Contributor

@dstandish dstandish May 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the other reason @uranusjr would be so that we can use standard ORM features such as relationships to get from user to dataset and vice versa but... i suppose you could say that you could still do so with username via custom join conditions 🤷

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

brought this up on slack. the suggestion is to not add user column at all right now since user handling will change with AIP-56. and for auditing purposes you can add a record to the Log table.

"timestamp": self.default_time,
}

def test_should_raises_401_unauthenticated(self, session):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def test_should_raises_401_unauthenticated(self, session):
def test_should_raise_401_unauthenticated(self, session):

@dstandish
Copy link
Contributor

This feature looks like something we can use. I had a quick look and one thing came to mind:

It looks like it is not possible to create a remote dataset event if the dataset does not exist already. Is that correct? I my opinion, it would then also be nice if we can create a dataset by using the API.

Yeah it's a good idea @DjVinnii, perhaps you'd be interested in contributing that? although, if it does not exist, then nothing on the cluster is using it, so it would not really have any effect.

session=session,
)

if dataset_event:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i wish it threw a helpful exception if there is no dataset instead of using None to signal that. though i don't think there's anything we can do about that now, can we @uranusjr ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh wait... this is a new method. so we could. but wait do we even need a new method?

Copy link
Contributor

@dstandish dstandish May 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@michaelmicheal why do we need a new method for this? could we not add params to register_dataset_change?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since the params are kwargs-only, i reckon we could make task_instance optional.

and, thankfully, since it accepts **kwargs, adding more params at call site in airflow won't break anything for "old" custom dataset managers

except ValidationError as err:
raise BadRequest(detail=str(err))
uri = json_body["dataset_uri"]
external_source = request.remote_addr
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i am not sure that remote_addr is the right choice here. maybe we could record such information in the log table? but to me it would seem it might be more useful to let this be an arbitrary text field? although i suppose user can always supply information in the extra dict.... wdyt?

@DjVinnii
Copy link
Contributor

Yeah it's a good idea @DjVinnii, perhaps you'd be interested in contributing that? although, if it does not exist, then nothing on the cluster is using it, so it would not really have any effect.

Our use case is to be able to synchronize datasets between multiple Airflow instances so that consumers only have to know the dataset name and they don't have to be aware in that instance the producer dag is. At the moment we are creating and updating datasets across Airflow instances by using a hacky dummy dag that produces the dataset in other instances, but an API seems way more robust.

I'm willing to give this a try and contribute.

@github-actions
Copy link

github-actions bot commented Jul 4, 2023

This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.

@github-actions github-actions bot added the stale Stale PRs per the .github/workflows/stale.yml policy file label Jul 4, 2023
@github-actions github-actions bot closed this Jul 9, 2023
@marclamberti
Copy link

Is this feature going to be merged for 2.7?

@uranusjr
Copy link
Member

This is not finished and cannot be merged. If you are interested in the feature, please open a new pull request and work on it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:API Airflow's REST/HTTP API stale Stale PRs per the .github/workflows/stale.yml policy file
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Support External Dataset Updates
10 participants