From 49497a07589274b09f2599dc3d39112a7275af70 Mon Sep 17 00:00:00 2001 From: Rodrigo Araya Date: Mon, 11 Mar 2019 15:18:33 -0700 Subject: [PATCH] Use system logging instead of Airflow logs (#11) --- marquez/airflow.py | 45 +++++++++++++-------------------------------- 1 file changed, 13 insertions(+), 32 deletions(-) diff --git a/marquez/airflow.py b/marquez/airflow.py index 03ba2bd71960e..74987fff2ac5d 100644 --- a/marquez/airflow.py +++ b/marquez/airflow.py @@ -1,9 +1,7 @@ -import json +import logging import pendulum -from airflow.models import DAG, Log -from airflow.utils.db import provide_session +from airflow.models import DAG from marquez_client.marquez import MarquezClient - from marquez.utils import JobIdMapping @@ -52,18 +50,9 @@ def report_jobrun(self, run_args, execution_date): nominal_start_time=start_time, nominal_end_time=end_time).run_id) mqz_client.mark_job_run_running(mqz_job_run_id) - - self.log_marquez_event('job_running', - namespace=self.mqz_namespace, - name=job_name, - description=self.description, - location=self.mqz_location, - runArgs=job_run_args, - nominal_start_time=start_time, - nominal_end_time=end_time, - jobrun_id=mqz_job_run_id, - inputDatasetUrns=self.mqz_input_datasets, - outputDatasetUrns=self.mqz_output_datasets) + self.log_marquez_event(['job_running', + mqz_job_run_id, + start_time]) return mqz_job_run_id def report_jobrun_change(self, dagrun, **kwargs): @@ -75,23 +64,15 @@ def report_jobrun_change(self, dagrun, **kwargs): else: self.get_mqz_client().mark_job_run_failed(mqz_job_run_id) state = 'COMPLETED' if kwargs.get('success') else 'FAILED' - event = ('job_state_change' if mqz_job_run_id - else 'job_state_change_LOST') - self.log_marquez_event(event, - job_name=self.dag_id, - jobrun_id=mqz_job_run_id, - state=state, - reason=kwargs['reason']) + self.log_marquez_event(['job_state_change', + mqz_job_run_id, + state]) - @provide_session - def log_marquez_event(self, event, session=None, **kwargs): - session.add(Log( - event=event, - task_instance=None, - owner="marquez", - extra=json.dumps(kwargs), - task_id=None, - dag_id=self.dag_id)) + def log_marquez_event(self, args): + logging.info("\t".join(["[marquez]", + self.mqz_namespace, + self.dag_id, + ] + args)) def get_mqz_client(self): if not self._mqz_client: