diff --git a/airflow/providers/openlineage/plugins/adapter.py b/airflow/providers/openlineage/plugins/adapter.py index e2ee8a2695008..cf9fb716e3406 100644 --- a/airflow/providers/openlineage/plugins/adapter.py +++ b/airflow/providers/openlineage/plugins/adapter.py @@ -26,6 +26,7 @@ BaseFacet, DocumentationJobFacet, ErrorMessageRunFacet, + JobTypeJobFacet, NominalTimeRunFacet, OwnershipJobFacet, OwnershipJobFacetOwners, @@ -56,6 +57,12 @@ set_producer(_PRODUCER) +# https://openlineage.io/docs/spec/facets/job-facets/job-type +# They must be set after the `set_producer(_PRODUCER)` +# otherwise the `JobTypeJobFacet._producer` will be set with the default value +_JOB_TYPE_DAG = JobTypeJobFacet(jobType="DAG", integration="AIRFLOW", processingType="BATCH") +_JOB_TYPE_TASK = JobTypeJobFacet(jobType="TASK", integration="AIRFLOW", processingType="BATCH") + class OpenLineageAdapter(LoggingMixin): """Translate Airflow metadata to OpenLineage events instead of creating them from Airflow code.""" @@ -181,6 +188,7 @@ def start_task( ), job=self._build_job( job_name=job_name, + job_type=_JOB_TYPE_TASK, job_description=job_description, code_location=code_location, owners=owners, @@ -222,7 +230,7 @@ def complete_task( parent_run_id=parent_run_id, run_facets=task.run_facets, ), - job=self._build_job(job_name, job_facets=task.job_facets), + job=self._build_job(job_name, job_type=_JOB_TYPE_TASK, job_facets=task.job_facets), inputs=task.inputs, outputs=task.outputs, producer=_PRODUCER, @@ -259,7 +267,7 @@ def fail_task( parent_run_id=parent_run_id, run_facets=task.run_facets, ), - job=self._build_job(job_name, job_facets=task.job_facets), + job=self._build_job(job_name, job_type=_JOB_TYPE_TASK, job_facets=task.job_facets), inputs=task.inputs, outputs=task.outputs, producer=_PRODUCER, @@ -276,7 +284,7 @@ def dag_started( event = RunEvent( eventType=RunState.START, eventTime=dag_run.start_date.isoformat(), - job=Job(name=dag_run.dag_id, namespace=_DAG_NAMESPACE), + job=Job(name=dag_run.dag_id, namespace=_DAG_NAMESPACE, facets={"jobType": _JOB_TYPE_DAG}), run=self._build_run( run_id=self.build_dag_run_id(dag_run.dag_id, dag_run.run_id), job_name=dag_run.dag_id, @@ -293,7 +301,7 @@ def dag_success(self, dag_run: DagRun, msg: str): event = RunEvent( eventType=RunState.COMPLETE, eventTime=dag_run.end_date.isoformat(), - job=Job(name=dag_run.dag_id, namespace=_DAG_NAMESPACE), + job=Job(name=dag_run.dag_id, namespace=_DAG_NAMESPACE, facets={"jobType": _JOB_TYPE_DAG}), run=Run(runId=self.build_dag_run_id(dag_run.dag_id, dag_run.run_id)), inputs=[], outputs=[], @@ -305,7 +313,7 @@ def dag_failed(self, dag_run: DagRun, msg: str): event = RunEvent( eventType=RunState.FAIL, eventTime=dag_run.end_date.isoformat(), - job=Job(name=dag_run.dag_id, namespace=_DAG_NAMESPACE), + job=Job(name=dag_run.dag_id, namespace=_DAG_NAMESPACE, facets={"jobType": _JOB_TYPE_DAG}), run=Run( runId=self.build_dag_run_id(dag_run.dag_id, dag_run.run_id), facets={"errorMessage": ErrorMessageRunFacet(message=msg, programmingLanguage="python")}, @@ -350,6 +358,7 @@ def _build_run( @staticmethod def _build_job( job_name: str, + job_type: JobTypeJobFacet, job_description: str | None = None, code_location: str | None = None, owners: list[str] | None = None, @@ -372,4 +381,6 @@ def _build_job( if job_facets: facets = {**facets, **job_facets} + facets.update({"jobType": job_type}) + return Job(_DAG_NAMESPACE, job_name, facets) diff --git a/tests/providers/openlineage/plugins/test_openlineage_adapter.py b/tests/providers/openlineage/plugins/test_openlineage_adapter.py index cdec3de941b29..ec1dfc6eb5741 100644 --- a/tests/providers/openlineage/plugins/test_openlineage_adapter.py +++ b/tests/providers/openlineage/plugins/test_openlineage_adapter.py @@ -28,6 +28,7 @@ DocumentationJobFacet, ErrorMessageRunFacet, ExternalQueryRunFacet, + JobTypeJobFacet, NominalTimeRunFacet, OwnershipJobFacet, OwnershipJobFacetOwners, @@ -163,7 +164,12 @@ def test_emit_start_event(mock_stats_incr, mock_stats_timer): job=Job( namespace=_DAG_NAMESPACE, name="job", - facets={"documentation": DocumentationJobFacet(description="description")}, + facets={ + "documentation": DocumentationJobFacet(description="description"), + "jobType": JobTypeJobFacet( + processingType="BATCH", integration="AIRFLOW", jobType="TASK" + ), + }, ), producer=_PRODUCER, inputs=[], @@ -244,6 +250,9 @@ def test_emit_start_event_with_additional_information(mock_stats_incr, mock_stat ] ), "sql": SqlJobFacet(query="SELECT 1;"), + "jobType": JobTypeJobFacet( + processingType="BATCH", integration="AIRFLOW", jobType="TASK" + ), }, ), producer=_PRODUCER, @@ -284,7 +293,15 @@ def test_emit_complete_event(mock_stats_incr, mock_stats_timer): eventType=RunState.COMPLETE, eventTime=event_time, run=Run(runId=run_id, facets={}), - job=Job(namespace=_DAG_NAMESPACE, name="job", facets={}), + job=Job( + namespace=_DAG_NAMESPACE, + name="job", + facets={ + "jobType": JobTypeJobFacet( + processingType="BATCH", integration="AIRFLOW", jobType="TASK" + ) + }, + ), producer=_PRODUCER, inputs=[], outputs=[], @@ -338,7 +355,16 @@ def test_emit_complete_event_with_additional_information(mock_stats_incr, mock_s "externalQuery": ExternalQueryRunFacet(externalQueryId="123", source="source"), }, ), - job=Job(namespace=_DAG_NAMESPACE, name="job", facets={"sql": SqlJobFacet(query="SELECT 1;")}), + job=Job( + namespace="default", + name="job", + facets={ + "sql": SqlJobFacet(query="SELECT 1;"), + "jobType": JobTypeJobFacet( + processingType="BATCH", integration="AIRFLOW", jobType="TASK" + ), + }, + ), producer=_PRODUCER, inputs=[ Dataset(namespace="bigquery", name="a.b.c"), @@ -377,7 +403,15 @@ def test_emit_failed_event(mock_stats_incr, mock_stats_timer): eventType=RunState.FAIL, eventTime=event_time, run=Run(runId=run_id, facets={}), - job=Job(namespace=_DAG_NAMESPACE, name="job", facets={}), + job=Job( + namespace=_DAG_NAMESPACE, + name="job", + facets={ + "jobType": JobTypeJobFacet( + processingType="BATCH", integration="AIRFLOW", jobType="TASK" + ) + }, + ), producer=_PRODUCER, inputs=[], outputs=[], @@ -431,7 +465,16 @@ def test_emit_failed_event_with_additional_information(mock_stats_incr, mock_sta "externalQuery": ExternalQueryRunFacet(externalQueryId="123", source="source"), }, ), - job=Job(namespace=_DAG_NAMESPACE, name="job", facets={"sql": SqlJobFacet(query="SELECT 1;")}), + job=Job( + namespace="default", + name="job", + facets={ + "sql": SqlJobFacet(query="SELECT 1;"), + "jobType": JobTypeJobFacet( + processingType="BATCH", integration="AIRFLOW", jobType="TASK" + ), + }, + ), producer=_PRODUCER, inputs=[ Dataset(namespace="bigquery", name="a.b.c"), @@ -485,7 +528,15 @@ def test_emit_dag_started_event(mock_stats_incr, mock_stats_timer, uuid): ) }, ), - job=Job(namespace=_DAG_NAMESPACE, name="dag_id", facets={}), + job=Job( + namespace=_DAG_NAMESPACE, + name="dag_id", + facets={ + "jobType": JobTypeJobFacet( + processingType="BATCH", integration="AIRFLOW", jobType="DAG" + ) + }, + ), producer=_PRODUCER, inputs=[], outputs=[], @@ -527,7 +578,15 @@ def test_emit_dag_complete_event(mock_stats_incr, mock_stats_timer, uuid): eventType=RunState.COMPLETE, eventTime=event_time.isoformat(), run=Run(runId=random_uuid, facets={}), - job=Job(namespace=_DAG_NAMESPACE, name="dag_id", facets={}), + job=Job( + namespace=_DAG_NAMESPACE, + name="dag_id", + facets={ + "jobType": JobTypeJobFacet( + processingType="BATCH", integration="AIRFLOW", jobType="DAG" + ) + }, + ), producer=_PRODUCER, inputs=[], outputs=[], @@ -576,7 +635,15 @@ def test_emit_dag_failed_event(mock_stats_incr, mock_stats_timer, uuid): ) }, ), - job=Job(namespace=_DAG_NAMESPACE, name="dag_id", facets={}), + job=Job( + namespace=_DAG_NAMESPACE, + name="dag_id", + facets={ + "jobType": JobTypeJobFacet( + processingType="BATCH", integration="AIRFLOW", jobType="DAG" + ) + }, + ), producer=_PRODUCER, inputs=[], outputs=[],