From 8d1282412067bca0e72ff599c3d2c9450d95b9ea Mon Sep 17 00:00:00 2001 From: Rodrigo Araya Date: Thu, 24 Jan 2019 16:08:17 -0800 Subject: [PATCH] Using the DAG extension approach - extended airflow's DAG for metadata collection - added marquez-client to submit metadata to Marquez --- .gitignore | 17 +++++++ README.md | 45 +++++++++++++++++- examples/demo_dag.py | 22 +++++++++ marquez/__init__.py | 0 marquez/airflow.py | 93 ++++++++++++++++++++++++++++++++++++++ marquez/client/__init__.py | 57 +++++++++++++++++++++++ setup.py | 24 ++++++++++ 7 files changed, 257 insertions(+), 1 deletion(-) create mode 100644 .gitignore create mode 100644 examples/demo_dag.py create mode 100644 marquez/__init__.py create mode 100644 marquez/airflow.py create mode 100644 marquez/client/__init__.py create mode 100644 setup.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000000000..7424394464022 --- /dev/null +++ b/.gitignore @@ -0,0 +1,17 @@ +*.iml +*.jar +*.swp +*.egg-info +.classpath +.editorconfig +.gradle/ +.idea/ +.project +.settings +.vscode/ +./out/* +./**/*.class +bin/ +build/ +dist/ +__pycache__ diff --git a/README.md b/README.md index 1fade4161726f..aafb2518c8115 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,45 @@ # marquez-airflow -Library for Marquez integration with Airflow +Is a library that integrates Airflow DAGs with Marquez for automatic metadata collection. + +# Requirements + - Python 3.5+ + - apache-airflow 1.10.0+ + - marquez-client + + # Installation + + ``` +pip install marquez-airflow +``` + + # Usage + + Once the library is installed in your system, your current DAGs need to be modified slightly by changing the DAG to a MarquezDAG, see example below: + +```python +from marquez.airflow import MarquezDag as DAG +from airflow.operators.dummy_operator import DummyOperator +from datetime import datetime + + +DAG_NAME = 'my_DAG_name' + +default_args = { + 'mqz_namespace': 'namespace_1', + 'mqz_location': 'github://data-dags/dag_location/', + 'mqz_input_datasets': ["s3://some_data", "s3://more_data"], + 'mqz_output_datasets': ["s3://output_data"], + + 'owner': ..., + 'depends_on_past': False, + 'start_date': ..., +} + +dag = DAG(DAG_NAME, schedule_interval='*/10 * * * *', + default_args=default_args, description="yet another DAG") + +run_this = DummyOperator(task_id='run_this', dag=dag) +run_this_too = DummyOperator(task_id='run_this_too', dag=dag) +run_this_too.set_upstream(run_this) +``` + diff --git a/examples/demo_dag.py b/examples/demo_dag.py new file mode 100644 index 0000000000000..e9fd77714f041 --- /dev/null +++ b/examples/demo_dag.py @@ -0,0 +1,22 @@ + +from marquez.airflow import MarquezDag as DAG +from airflow.operators.dummy_operator import DummyOperator +from datetime import datetime +DAG_NAME = 'test_dag_v2' + +default_args = { + 'mqz_namespace': 'demo', + 'mqz_location': 'github://my_dag_location', + 'mqz_input_datasets': ["s3://great_data", "s3://not_so_good_data"], + 'mqz_output_datasets': ["s3://amazing_data"], + 'owner': 'some dag developer', + 'depends_on_past': False, + 'start_date': datetime(2019, 1, 31), +} + +dag = DAG(DAG_NAME, schedule_interval='*/10 * * * *', + default_args=default_args, description="My awesome DAG") + +run_this_1 = DummyOperator(task_id='run_this_1', dag=dag) +run_this_2 = DummyOperator(task_id='run_this_2', dag=dag) +run_this_2.set_upstream(run_this_1) diff --git a/marquez/__init__.py b/marquez/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/marquez/airflow.py b/marquez/airflow.py new file mode 100644 index 0000000000000..caec008dcff3d --- /dev/null +++ b/marquez/airflow.py @@ -0,0 +1,93 @@ +import json +import pendulum +import airflow.models +from airflow.utils.db import provide_session +from marquez.client import MarquezClient, RunState + + +class MarquezDag(airflow.models.DAG): + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.mqz_client = MarquezClient() + self.mqz_namespace = kwargs['default_args'].get('mqz_namespace', 'unknown') + self.mqz_location = kwargs['default_args'].get('mqz_location', 'unknown') + self.mqz_input_datasets = kwargs['default_args'].get('mqz_input_datasets', []) + self.mqz_output_datasets = kwargs['default_args'].get('mqz_output_datasets', []) + + def create_dagrun(self, *args, **kwargs): + job_name = self.dag_id + job_run_args = "{}" # TODO retrieve from DAG/tasks + start_time = pendulum.instance(kwargs['execution_date']).to_datetime_string() + end_time = None + state = RunState.RUNNING + + self.mqz_client.set_namespace(self.mqz_namespace) + self.mqz_client.create_job(job_name, self.mqz_location, self.mqz_input_datasets, self.mqz_output_datasets, + self.description) + mqz_job_run_id = self.mqz_client.create_job_run(job_name, job_run_args=job_run_args, + nominal_start_time=start_time, nominal_end_time=end_time) + self.mqz_client.set_jobrun_state(mqz_job_run_id, state) + + self.marquez_log('job_running', json.dumps( + {'namespace': self.mqz_namespace, + 'name': job_name, + 'description': self.description, + 'location': self.mqz_location, + 'runArgs': job_run_args, + 'nominal_start_time': start_time, + 'nominal_end_time': end_time, + 'jobrun_id': mqz_job_run_id, + 'inputDatasetUrns': self.mqz_input_datasets, + 'outputDatasetUrns': self.mqz_output_datasets, + 'state': str(state.name) + })) + + run = super().create_dagrun(*args, **kwargs) + airflow.models.Variable.set(run.run_id, mqz_job_run_id) + + return run + + def handle_callback(self, *args, **kwargs): + job_name = self.dag_id + + if kwargs.get('success'): + state = RunState.COMPLETED + else: + state = RunState.FAILED + + + + mqz_job_run_id = self.get_and_delete(args[0].run_id) + + if mqz_job_run_id: + self.mqz_client.set_jobrun_state(mqz_job_run_id, state) + self.marquez_log('job_state_change', + json.dumps({'job_name': job_name, + 'jobrun_id': mqz_job_run_id, + 'state': str(state.name)})) + else: + # TODO warn that the jobrun_id couldn't be found + pass + + return super().handle_callback(*args, **kwargs) + + @provide_session + def get_and_delete(self, key, session=None): + q = session.query(airflow.models.Variable).filter(airflow.models.Variable.key == key) + if q.first() is None: + return + else: + val = q.first().val + q.delete(synchronize_session=False) + return val + + @provide_session + def marquez_log(self, event, extras, session=None): + session.add(airflow.models.Log( + event=event, + task_instance=None, + owner="marquez", + extra=extras, + task_id=None, + dag_id=self.dag_id)) \ No newline at end of file diff --git a/marquez/client/__init__.py b/marquez/client/__init__.py new file mode 100644 index 0000000000000..d9be9b3cf6087 --- /dev/null +++ b/marquez/client/__init__.py @@ -0,0 +1,57 @@ +from enum import Enum +from marquez_client import Configuration, CreateNamespace, CreateJob, CreateJobRun, NamespacesApi, ApiClient, JobsApi + +class RunState(Enum): + RUNNING = 1 + COMPLETED = 2 + FAILED = 3 + ABORTED = 4 + + +class MarquezClient: + namespace = None + jobs_api_client = None + namespace_api_client = None + + def __init__(self): + conf = Configuration() + conf.host = 'localhost:5000/api/v1' # TODO make it a set() method + + # create an instance of the API class + api_client = ApiClient(conf) + self.jobs_api_client = JobsApi(api_client) + self.namespace_api_client = NamespacesApi(api_client) + + def set_namespace(self, namespace): + self.namespace = namespace + payload = CreateNamespace('anonymous', '') + return self.namespace_api_client.namespaces_namespace_put(namespace, create_namespace=payload) + + def create_job(self, job_name, location, input_dataset_urns, output_dataset_urns, description): + if self.namespace is None: + raise("You have to set the namespace first.") # TODO change this to be a WARN + + payload = CreateJob(input_dataset_urns, output_dataset_urns, location, description) + return self.jobs_api_client.namespaces_namespace_jobs_job_put(self.namespace, job_name, create_job=payload) + + + def create_job_run(self, job_name, job_run_args, nominal_start_time, nominal_end_time): + if self.namespace is None: + raise("You have to set the namespace first.") # TODO change this to be a WARN + + payload = CreateJobRun(nominal_start_time, nominal_end_time, job_run_args) + return self.jobs_api_client.namespaces_namespace_jobs_job_runs_post( + self.namespace, job_name, create_job_run=payload).run_id + + def set_jobrun_state(self, run_id, state): + if state == RunState.RUNNING: + self.jobs_api_client.jobs_runs_id_run_put(run_id) + elif state == RunState.COMPLETED: + self.jobs_api_client.jobs_runs_id_complete_put(run_id) + elif state == RunState.FAILED: + self.jobs_api_client.jobs_runs_id_fail_put(run_id) + elif state == RunState.ABORTED: + self.jobs_api_client.jobs_runs_id_abort_put(run_id) + else: + # TODO WARN invalid state + pass diff --git a/setup.py b/setup.py new file mode 100644 index 0000000000000..7ccd8431060d9 --- /dev/null +++ b/setup.py @@ -0,0 +1,24 @@ +import io +import setuptools + +with io.open('README.md', encoding='utf-8') as f: + long_description = f.read() + +setuptools.setup( + name="marquez-airflow", + version="0.0.1", + author="Author", + author_email="author@example.com", + description="Marquez integration with Airflow", + long_description=long_description, + long_description_content_type="text/markdown", + url="https://github.com/MarquezProject/marquez-airflow", + packages=setuptools.find_packages(), + install_requires = [ + "apache-airflow>=1.10.0" + ], + classifiers=[ + "Programming Language :: Python :: 3", + "License :: OSI Approved :: Apache Software License", + ], +)