Skip to content

Commit

Permalink
[OpenLineage] Add support for JobTypeJobFacet properties. (#37255)
Browse files Browse the repository at this point in the history
* Add support for JobTypeJobFacet properties.

Signed-off-by: Mattia Bertorello <[email protected]>

* Use the method _build_job to create the Job object for DAG events

Signed-off-by: Mattia Bertorello <[email protected]>

---------

Signed-off-by: Mattia Bertorello <[email protected]>
Co-authored-by: Maciej Obuchowski <[email protected]>
  • Loading branch information
mattiabertorello and mobuchowski authored Feb 19, 2024
1 parent f2ea8a3 commit 1851a71
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 13 deletions.
21 changes: 16 additions & 5 deletions airflow/providers/openlineage/plugins/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
BaseFacet,
DocumentationJobFacet,
ErrorMessageRunFacet,
JobTypeJobFacet,
NominalTimeRunFacet,
OwnershipJobFacet,
OwnershipJobFacetOwners,
Expand Down Expand Up @@ -56,6 +57,12 @@

set_producer(_PRODUCER)

# https://openlineage.io/docs/spec/facets/job-facets/job-type
# They must be set after the `set_producer(_PRODUCER)`
# otherwise the `JobTypeJobFacet._producer` will be set with the default value
_JOB_TYPE_DAG = JobTypeJobFacet(jobType="DAG", integration="AIRFLOW", processingType="BATCH")
_JOB_TYPE_TASK = JobTypeJobFacet(jobType="TASK", integration="AIRFLOW", processingType="BATCH")


class OpenLineageAdapter(LoggingMixin):
"""Translate Airflow metadata to OpenLineage events instead of creating them from Airflow code."""
Expand Down Expand Up @@ -181,6 +188,7 @@ def start_task(
),
job=self._build_job(
job_name=job_name,
job_type=_JOB_TYPE_TASK,
job_description=job_description,
code_location=code_location,
owners=owners,
Expand Down Expand Up @@ -222,7 +230,7 @@ def complete_task(
parent_run_id=parent_run_id,
run_facets=task.run_facets,
),
job=self._build_job(job_name, job_facets=task.job_facets),
job=self._build_job(job_name, job_type=_JOB_TYPE_TASK, job_facets=task.job_facets),
inputs=task.inputs,
outputs=task.outputs,
producer=_PRODUCER,
Expand Down Expand Up @@ -259,7 +267,7 @@ def fail_task(
parent_run_id=parent_run_id,
run_facets=task.run_facets,
),
job=self._build_job(job_name, job_facets=task.job_facets),
job=self._build_job(job_name, job_type=_JOB_TYPE_TASK, job_facets=task.job_facets),
inputs=task.inputs,
outputs=task.outputs,
producer=_PRODUCER,
Expand All @@ -276,7 +284,7 @@ def dag_started(
event = RunEvent(
eventType=RunState.START,
eventTime=dag_run.start_date.isoformat(),
job=Job(name=dag_run.dag_id, namespace=_DAG_NAMESPACE),
job=self._build_job(job_name=dag_run.dag_id, job_type=_JOB_TYPE_DAG),
run=self._build_run(
run_id=self.build_dag_run_id(dag_run.dag_id, dag_run.run_id),
job_name=dag_run.dag_id,
Expand All @@ -293,7 +301,7 @@ def dag_success(self, dag_run: DagRun, msg: str):
event = RunEvent(
eventType=RunState.COMPLETE,
eventTime=dag_run.end_date.isoformat(),
job=Job(name=dag_run.dag_id, namespace=_DAG_NAMESPACE),
job=self._build_job(job_name=dag_run.dag_id, job_type=_JOB_TYPE_DAG),
run=Run(runId=self.build_dag_run_id(dag_run.dag_id, dag_run.run_id)),
inputs=[],
outputs=[],
Expand All @@ -305,7 +313,7 @@ def dag_failed(self, dag_run: DagRun, msg: str):
event = RunEvent(
eventType=RunState.FAIL,
eventTime=dag_run.end_date.isoformat(),
job=Job(name=dag_run.dag_id, namespace=_DAG_NAMESPACE),
job=self._build_job(job_name=dag_run.dag_id, job_type=_JOB_TYPE_DAG),
run=Run(
runId=self.build_dag_run_id(dag_run.dag_id, dag_run.run_id),
facets={"errorMessage": ErrorMessageRunFacet(message=msg, programmingLanguage="python")},
Expand Down Expand Up @@ -350,6 +358,7 @@ def _build_run(
@staticmethod
def _build_job(
job_name: str,
job_type: JobTypeJobFacet,
job_description: str | None = None,
code_location: str | None = None,
owners: list[str] | None = None,
Expand All @@ -372,4 +381,6 @@ def _build_job(
if job_facets:
facets = {**facets, **job_facets}

facets.update({"jobType": job_type})

return Job(_DAG_NAMESPACE, job_name, facets)
83 changes: 75 additions & 8 deletions tests/providers/openlineage/plugins/test_openlineage_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
DocumentationJobFacet,
ErrorMessageRunFacet,
ExternalQueryRunFacet,
JobTypeJobFacet,
NominalTimeRunFacet,
OwnershipJobFacet,
OwnershipJobFacetOwners,
Expand Down Expand Up @@ -163,7 +164,12 @@ def test_emit_start_event(mock_stats_incr, mock_stats_timer):
job=Job(
namespace=_DAG_NAMESPACE,
name="job",
facets={"documentation": DocumentationJobFacet(description="description")},
facets={
"documentation": DocumentationJobFacet(description="description"),
"jobType": JobTypeJobFacet(
processingType="BATCH", integration="AIRFLOW", jobType="TASK"
),
},
),
producer=_PRODUCER,
inputs=[],
Expand Down Expand Up @@ -244,6 +250,9 @@ def test_emit_start_event_with_additional_information(mock_stats_incr, mock_stat
]
),
"sql": SqlJobFacet(query="SELECT 1;"),
"jobType": JobTypeJobFacet(
processingType="BATCH", integration="AIRFLOW", jobType="TASK"
),
},
),
producer=_PRODUCER,
Expand Down Expand Up @@ -284,7 +293,15 @@ def test_emit_complete_event(mock_stats_incr, mock_stats_timer):
eventType=RunState.COMPLETE,
eventTime=event_time,
run=Run(runId=run_id, facets={}),
job=Job(namespace=_DAG_NAMESPACE, name="job", facets={}),
job=Job(
namespace=_DAG_NAMESPACE,
name="job",
facets={
"jobType": JobTypeJobFacet(
processingType="BATCH", integration="AIRFLOW", jobType="TASK"
)
},
),
producer=_PRODUCER,
inputs=[],
outputs=[],
Expand Down Expand Up @@ -338,7 +355,16 @@ def test_emit_complete_event_with_additional_information(mock_stats_incr, mock_s
"externalQuery": ExternalQueryRunFacet(externalQueryId="123", source="source"),
},
),
job=Job(namespace=_DAG_NAMESPACE, name="job", facets={"sql": SqlJobFacet(query="SELECT 1;")}),
job=Job(
namespace="default",
name="job",
facets={
"sql": SqlJobFacet(query="SELECT 1;"),
"jobType": JobTypeJobFacet(
processingType="BATCH", integration="AIRFLOW", jobType="TASK"
),
},
),
producer=_PRODUCER,
inputs=[
Dataset(namespace="bigquery", name="a.b.c"),
Expand Down Expand Up @@ -377,7 +403,15 @@ def test_emit_failed_event(mock_stats_incr, mock_stats_timer):
eventType=RunState.FAIL,
eventTime=event_time,
run=Run(runId=run_id, facets={}),
job=Job(namespace=_DAG_NAMESPACE, name="job", facets={}),
job=Job(
namespace=_DAG_NAMESPACE,
name="job",
facets={
"jobType": JobTypeJobFacet(
processingType="BATCH", integration="AIRFLOW", jobType="TASK"
)
},
),
producer=_PRODUCER,
inputs=[],
outputs=[],
Expand Down Expand Up @@ -431,7 +465,16 @@ def test_emit_failed_event_with_additional_information(mock_stats_incr, mock_sta
"externalQuery": ExternalQueryRunFacet(externalQueryId="123", source="source"),
},
),
job=Job(namespace=_DAG_NAMESPACE, name="job", facets={"sql": SqlJobFacet(query="SELECT 1;")}),
job=Job(
namespace="default",
name="job",
facets={
"sql": SqlJobFacet(query="SELECT 1;"),
"jobType": JobTypeJobFacet(
processingType="BATCH", integration="AIRFLOW", jobType="TASK"
),
},
),
producer=_PRODUCER,
inputs=[
Dataset(namespace="bigquery", name="a.b.c"),
Expand Down Expand Up @@ -485,7 +528,15 @@ def test_emit_dag_started_event(mock_stats_incr, mock_stats_timer, uuid):
)
},
),
job=Job(namespace=_DAG_NAMESPACE, name="dag_id", facets={}),
job=Job(
namespace=_DAG_NAMESPACE,
name="dag_id",
facets={
"jobType": JobTypeJobFacet(
processingType="BATCH", integration="AIRFLOW", jobType="DAG"
)
},
),
producer=_PRODUCER,
inputs=[],
outputs=[],
Expand Down Expand Up @@ -527,7 +578,15 @@ def test_emit_dag_complete_event(mock_stats_incr, mock_stats_timer, uuid):
eventType=RunState.COMPLETE,
eventTime=event_time.isoformat(),
run=Run(runId=random_uuid, facets={}),
job=Job(namespace=_DAG_NAMESPACE, name="dag_id", facets={}),
job=Job(
namespace=_DAG_NAMESPACE,
name="dag_id",
facets={
"jobType": JobTypeJobFacet(
processingType="BATCH", integration="AIRFLOW", jobType="DAG"
)
},
),
producer=_PRODUCER,
inputs=[],
outputs=[],
Expand Down Expand Up @@ -576,7 +635,15 @@ def test_emit_dag_failed_event(mock_stats_incr, mock_stats_timer, uuid):
)
},
),
job=Job(namespace=_DAG_NAMESPACE, name="dag_id", facets={}),
job=Job(
namespace=_DAG_NAMESPACE,
name="dag_id",
facets={
"jobType": JobTypeJobFacet(
processingType="BATCH", integration="AIRFLOW", jobType="DAG"
)
},
),
producer=_PRODUCER,
inputs=[],
outputs=[],
Expand Down

0 comments on commit 1851a71

Please sign in to comment.