Skip to content

Commit

Permalink
Use system logging instead of Airflow logs (apache#11)
Browse files Browse the repository at this point in the history
  • Loading branch information
roaraya8 authored Mar 11, 2019
1 parent ae04369 commit 49497a0
Showing 1 changed file with 13 additions and 32 deletions.
45 changes: 13 additions & 32 deletions marquez/airflow.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
import json
import logging
import pendulum
from airflow.models import DAG, Log
from airflow.utils.db import provide_session
from airflow.models import DAG
from marquez_client.marquez import MarquezClient

from marquez.utils import JobIdMapping


Expand Down Expand Up @@ -52,18 +50,9 @@ def report_jobrun(self, run_args, execution_date):
nominal_start_time=start_time,
nominal_end_time=end_time).run_id)
mqz_client.mark_job_run_running(mqz_job_run_id)

self.log_marquez_event('job_running',
namespace=self.mqz_namespace,
name=job_name,
description=self.description,
location=self.mqz_location,
runArgs=job_run_args,
nominal_start_time=start_time,
nominal_end_time=end_time,
jobrun_id=mqz_job_run_id,
inputDatasetUrns=self.mqz_input_datasets,
outputDatasetUrns=self.mqz_output_datasets)
self.log_marquez_event(['job_running',
mqz_job_run_id,
start_time])
return mqz_job_run_id

def report_jobrun_change(self, dagrun, **kwargs):
Expand All @@ -75,23 +64,15 @@ def report_jobrun_change(self, dagrun, **kwargs):
else:
self.get_mqz_client().mark_job_run_failed(mqz_job_run_id)
state = 'COMPLETED' if kwargs.get('success') else 'FAILED'
event = ('job_state_change' if mqz_job_run_id
else 'job_state_change_LOST')
self.log_marquez_event(event,
job_name=self.dag_id,
jobrun_id=mqz_job_run_id,
state=state,
reason=kwargs['reason'])
self.log_marquez_event(['job_state_change',
mqz_job_run_id,
state])

@provide_session
def log_marquez_event(self, event, session=None, **kwargs):
session.add(Log(
event=event,
task_instance=None,
owner="marquez",
extra=json.dumps(kwargs),
task_id=None,
dag_id=self.dag_id))
def log_marquez_event(self, args):
logging.info("\t".join(["[marquez]",
self.mqz_namespace,
self.dag_id,
] + args))

def get_mqz_client(self):
if not self._mqz_client:
Expand Down

0 comments on commit 49497a0

Please sign in to comment.