Skip to content

Commit

Permalink
Use the method _build_job to create the Job object for DAG events
Browse files Browse the repository at this point in the history
Signed-off-by: Mattia Bertorello <[email protected]>
  • Loading branch information
mattiabertorello committed Feb 15, 2024
1 parent 0ee175e commit f346c15
Showing 1 changed file with 3 additions and 3 deletions.
6 changes: 3 additions & 3 deletions airflow/providers/openlineage/plugins/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ def dag_started(
event = RunEvent(
eventType=RunState.START,
eventTime=dag_run.start_date.isoformat(),
job=Job(name=dag_run.dag_id, namespace=_DAG_NAMESPACE, facets={"jobType": _JOB_TYPE_DAG}),
job=self._build_job(job_name=dag_run.dag_id, job_type=_JOB_TYPE_DAG),
run=self._build_run(
run_id=self.build_dag_run_id(dag_run.dag_id, dag_run.run_id),
job_name=dag_run.dag_id,
Expand All @@ -301,7 +301,7 @@ def dag_success(self, dag_run: DagRun, msg: str):
event = RunEvent(
eventType=RunState.COMPLETE,
eventTime=dag_run.end_date.isoformat(),
job=Job(name=dag_run.dag_id, namespace=_DAG_NAMESPACE, facets={"jobType": _JOB_TYPE_DAG}),
job=self._build_job(job_name=dag_run.dag_id, job_type=_JOB_TYPE_DAG),
run=Run(runId=self.build_dag_run_id(dag_run.dag_id, dag_run.run_id)),
inputs=[],
outputs=[],
Expand All @@ -313,7 +313,7 @@ def dag_failed(self, dag_run: DagRun, msg: str):
event = RunEvent(
eventType=RunState.FAIL,
eventTime=dag_run.end_date.isoformat(),
job=Job(name=dag_run.dag_id, namespace=_DAG_NAMESPACE, facets={"jobType": _JOB_TYPE_DAG}),
job=self._build_job(job_name=dag_run.dag_id, job_type=_JOB_TYPE_DAG),
run=Run(
runId=self.build_dag_run_id(dag_run.dag_id, dag_run.run_id),
facets={"errorMessage": ErrorMessageRunFacet(message=msg, programmingLanguage="python")},
Expand Down

0 comments on commit f346c15

Please sign in to comment.