Skip to content

Commit

Permalink
fix(ingestion/airflow-plugin): pipeline tasks discoverable in search (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
dushayntAW authored and yoonhyejin committed Jul 16, 2024
1 parent 1e32d73 commit 8f19d37
Show file tree
Hide file tree
Showing 8 changed files with 373 additions and 109 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -536,14 +536,27 @@ def on_dag_start(self, dag_run: "DagRun") -> None:
)
dataflow.emit(self.emitter, callback=self._make_emit_callback())

event: MetadataChangeProposalWrapper = MetadataChangeProposalWrapper(
entityUrn=str(dataflow.urn), aspect=StatusClass(removed=False)
)
self.emitter.emit(event)

for task in dag.tasks:
task_urn = builder.make_data_job_urn_with_flow(
str(dataflow.urn), task.task_id
)
event = MetadataChangeProposalWrapper(
entityUrn=task_urn, aspect=StatusClass(removed=False)
)
self.emitter.emit(event)

# emit tags
for tag in dataflow.tags:
tag_urn = builder.make_tag_urn(tag)

event: MetadataChangeProposalWrapper = MetadataChangeProposalWrapper(
event = MetadataChangeProposalWrapper(
entityUrn=tag_urn, aspect=StatusClass(removed=False)
)

self.emitter.emit(event)

browse_path_v2_event: MetadataChangeProposalWrapper = (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,28 @@
}
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(airflow,basic_iolets,prod)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,basic_iolets,prod),run_data_task)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(airflow,basic_iolets,prod)",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,17 @@
}
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(airflow,basic_iolets,prod)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(airflow,basic_iolets,prod)",
Expand All @@ -72,6 +83,17 @@
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,basic_iolets,prod),run_data_task)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,basic_iolets,prod),run_data_task)",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,28 @@
}
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(airflow,simple_dag,prod)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,simple_dag,prod),task_1)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(airflow,simple_dag,prod)",
Expand Down Expand Up @@ -102,6 +124,17 @@
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,simple_dag,prod),run_another_data_task)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,simple_dag,prod),task_1)",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,17 @@
}
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(airflow,simple_dag,prod)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(airflow,simple_dag,prod)",
Expand All @@ -73,6 +84,17 @@
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,simple_dag,prod),task_1)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,simple_dag,prod),task_1)",
Expand Down Expand Up @@ -102,6 +124,17 @@
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,simple_dag,prod),run_another_data_task)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,simple_dag,prod),task_1)",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,28 @@
}
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(airflow,snowflake_operator,prod)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,snowflake_operator,prod),transform_cost_table)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(airflow,snowflake_operator,prod)",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,28 @@
}
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(airflow,sqlite_operator,prod)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),create_cost_table)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(airflow,sqlite_operator,prod)",
Expand Down Expand Up @@ -102,6 +124,39 @@
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),transform_cost_table)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),cleanup_processed_costs)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),populate_cost_table)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),create_cost_table)",
Expand Down Expand Up @@ -155,6 +210,17 @@
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),cleanup_costs)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD)",
Expand Down
Loading

0 comments on commit 8f19d37

Please sign in to comment.