Skip to content

Commit

Permalink
Using the DAG extension approach
Browse files Browse the repository at this point in the history
- extended airflow's DAG for metadata collection
- added marquez-client to submit metadata to Marquez
  • Loading branch information
roaraya8 committed Feb 4, 2019
1 parent 262f516 commit 8d12824
Show file tree
Hide file tree
Showing 7 changed files with 257 additions and 1 deletion.
17 changes: 17 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
*.iml
*.jar
*.swp
*.egg-info
.classpath
.editorconfig
.gradle/
.idea/
.project
.settings
.vscode/
./out/*
./**/*.class
bin/
build/
dist/
__pycache__
45 changes: 44 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,45 @@
# marquez-airflow
Library for Marquez integration with Airflow
Is a library that integrates Airflow DAGs with Marquez for automatic metadata collection.

# Requirements
- Python 3.5+
- apache-airflow 1.10.0+
- marquez-client

# Installation

```
pip install marquez-airflow
```

# Usage

Once the library is installed in your system, your current DAGs need to be modified slightly by changing the DAG to a MarquezDAG, see example below:

```python
from marquez.airflow import MarquezDag as DAG
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime


DAG_NAME = 'my_DAG_name'

default_args = {
'mqz_namespace': 'namespace_1',
'mqz_location': 'github://data-dags/dag_location/',
'mqz_input_datasets': ["s3://some_data", "s3://more_data"],
'mqz_output_datasets': ["s3://output_data"],

'owner': ...,
'depends_on_past': False,
'start_date': ...,
}

dag = DAG(DAG_NAME, schedule_interval='*/10 * * * *',
default_args=default_args, description="yet another DAG")

run_this = DummyOperator(task_id='run_this', dag=dag)
run_this_too = DummyOperator(task_id='run_this_too', dag=dag)
run_this_too.set_upstream(run_this)
```

22 changes: 22 additions & 0 deletions examples/demo_dag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@

from marquez.airflow import MarquezDag as DAG
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime
DAG_NAME = 'test_dag_v2'

default_args = {
'mqz_namespace': 'demo',
'mqz_location': 'github://my_dag_location',
'mqz_input_datasets': ["s3://great_data", "s3://not_so_good_data"],
'mqz_output_datasets': ["s3://amazing_data"],
'owner': 'some dag developer',
'depends_on_past': False,
'start_date': datetime(2019, 1, 31),
}

dag = DAG(DAG_NAME, schedule_interval='*/10 * * * *',
default_args=default_args, description="My awesome DAG")

run_this_1 = DummyOperator(task_id='run_this_1', dag=dag)
run_this_2 = DummyOperator(task_id='run_this_2', dag=dag)
run_this_2.set_upstream(run_this_1)
Empty file added marquez/__init__.py
Empty file.
93 changes: 93 additions & 0 deletions marquez/airflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
import json
import pendulum
import airflow.models
from airflow.utils.db import provide_session
from marquez.client import MarquezClient, RunState


class MarquezDag(airflow.models.DAG):

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.mqz_client = MarquezClient()
self.mqz_namespace = kwargs['default_args'].get('mqz_namespace', 'unknown')
self.mqz_location = kwargs['default_args'].get('mqz_location', 'unknown')
self.mqz_input_datasets = kwargs['default_args'].get('mqz_input_datasets', [])
self.mqz_output_datasets = kwargs['default_args'].get('mqz_output_datasets', [])

def create_dagrun(self, *args, **kwargs):
job_name = self.dag_id
job_run_args = "{}" # TODO retrieve from DAG/tasks
start_time = pendulum.instance(kwargs['execution_date']).to_datetime_string()
end_time = None
state = RunState.RUNNING

self.mqz_client.set_namespace(self.mqz_namespace)
self.mqz_client.create_job(job_name, self.mqz_location, self.mqz_input_datasets, self.mqz_output_datasets,
self.description)
mqz_job_run_id = self.mqz_client.create_job_run(job_name, job_run_args=job_run_args,
nominal_start_time=start_time, nominal_end_time=end_time)
self.mqz_client.set_jobrun_state(mqz_job_run_id, state)

self.marquez_log('job_running', json.dumps(
{'namespace': self.mqz_namespace,
'name': job_name,
'description': self.description,
'location': self.mqz_location,
'runArgs': job_run_args,
'nominal_start_time': start_time,
'nominal_end_time': end_time,
'jobrun_id': mqz_job_run_id,
'inputDatasetUrns': self.mqz_input_datasets,
'outputDatasetUrns': self.mqz_output_datasets,
'state': str(state.name)
}))

run = super().create_dagrun(*args, **kwargs)
airflow.models.Variable.set(run.run_id, mqz_job_run_id)

return run

def handle_callback(self, *args, **kwargs):
job_name = self.dag_id

if kwargs.get('success'):
state = RunState.COMPLETED
else:
state = RunState.FAILED



mqz_job_run_id = self.get_and_delete(args[0].run_id)

if mqz_job_run_id:
self.mqz_client.set_jobrun_state(mqz_job_run_id, state)
self.marquez_log('job_state_change',
json.dumps({'job_name': job_name,
'jobrun_id': mqz_job_run_id,
'state': str(state.name)}))
else:
# TODO warn that the jobrun_id couldn't be found
pass

return super().handle_callback(*args, **kwargs)

@provide_session
def get_and_delete(self, key, session=None):
q = session.query(airflow.models.Variable).filter(airflow.models.Variable.key == key)
if q.first() is None:
return
else:
val = q.first().val
q.delete(synchronize_session=False)
return val

@provide_session
def marquez_log(self, event, extras, session=None):
session.add(airflow.models.Log(
event=event,
task_instance=None,
owner="marquez",
extra=extras,
task_id=None,
dag_id=self.dag_id))
57 changes: 57 additions & 0 deletions marquez/client/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
from enum import Enum
from marquez_client import Configuration, CreateNamespace, CreateJob, CreateJobRun, NamespacesApi, ApiClient, JobsApi

class RunState(Enum):
RUNNING = 1
COMPLETED = 2
FAILED = 3
ABORTED = 4


class MarquezClient:
namespace = None
jobs_api_client = None
namespace_api_client = None

def __init__(self):
conf = Configuration()
conf.host = 'localhost:5000/api/v1' # TODO make it a set() method

# create an instance of the API class
api_client = ApiClient(conf)
self.jobs_api_client = JobsApi(api_client)
self.namespace_api_client = NamespacesApi(api_client)

def set_namespace(self, namespace):
self.namespace = namespace
payload = CreateNamespace('anonymous', '')
return self.namespace_api_client.namespaces_namespace_put(namespace, create_namespace=payload)

def create_job(self, job_name, location, input_dataset_urns, output_dataset_urns, description):
if self.namespace is None:
raise("You have to set the namespace first.") # TODO change this to be a WARN

payload = CreateJob(input_dataset_urns, output_dataset_urns, location, description)
return self.jobs_api_client.namespaces_namespace_jobs_job_put(self.namespace, job_name, create_job=payload)


def create_job_run(self, job_name, job_run_args, nominal_start_time, nominal_end_time):
if self.namespace is None:
raise("You have to set the namespace first.") # TODO change this to be a WARN

payload = CreateJobRun(nominal_start_time, nominal_end_time, job_run_args)
return self.jobs_api_client.namespaces_namespace_jobs_job_runs_post(
self.namespace, job_name, create_job_run=payload).run_id

def set_jobrun_state(self, run_id, state):
if state == RunState.RUNNING:
self.jobs_api_client.jobs_runs_id_run_put(run_id)
elif state == RunState.COMPLETED:
self.jobs_api_client.jobs_runs_id_complete_put(run_id)
elif state == RunState.FAILED:
self.jobs_api_client.jobs_runs_id_fail_put(run_id)
elif state == RunState.ABORTED:
self.jobs_api_client.jobs_runs_id_abort_put(run_id)
else:
# TODO WARN invalid state
pass
24 changes: 24 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import io
import setuptools

with io.open('README.md', encoding='utf-8') as f:
long_description = f.read()

setuptools.setup(
name="marquez-airflow",
version="0.0.1",
author="Author",
author_email="[email protected]",
description="Marquez integration with Airflow",
long_description=long_description,
long_description_content_type="text/markdown",
url="https://github.com/MarquezProject/marquez-airflow",
packages=setuptools.find_packages(),
install_requires = [
"apache-airflow>=1.10.0"
],
classifiers=[
"Programming Language :: Python :: 3",
"License :: OSI Approved :: Apache Software License",
],
)

0 comments on commit 8d12824

Please sign in to comment.