From e40e068811cd156cb8e13700ff054046f4f9d0dc Mon Sep 17 00:00:00 2001 From: Rodrigo Araya Date: Fri, 8 Feb 2019 05:22:08 -0800 Subject: [PATCH] Cleanup usage of marquez-client (#1) --- marquez/airflow.py | 40 +++++++++++++------------- marquez/client/__init__.py | 57 -------------------------------------- 2 files changed, 20 insertions(+), 77 deletions(-) delete mode 100644 marquez/client/__init__.py diff --git a/marquez/airflow.py b/marquez/airflow.py index caec008dcff3d..9be60de45f334 100644 --- a/marquez/airflow.py +++ b/marquez/airflow.py @@ -2,7 +2,7 @@ import pendulum import airflow.models from airflow.utils.db import provide_session -from marquez.client import MarquezClient, RunState +from marquez_client.marquez import MarquezClient class MarquezDag(airflow.models.DAG): @@ -20,14 +20,14 @@ def create_dagrun(self, *args, **kwargs): job_run_args = "{}" # TODO retrieve from DAG/tasks start_time = pendulum.instance(kwargs['execution_date']).to_datetime_string() end_time = None - state = RunState.RUNNING self.mqz_client.set_namespace(self.mqz_namespace) self.mqz_client.create_job(job_name, self.mqz_location, self.mqz_input_datasets, self.mqz_output_datasets, self.description) mqz_job_run_id = self.mqz_client.create_job_run(job_name, job_run_args=job_run_args, - nominal_start_time=start_time, nominal_end_time=end_time) - self.mqz_client.set_jobrun_state(mqz_job_run_id, state) + nominal_start_time=start_time, + nominal_end_time=end_time).run_id + self.mqz_client.mark_job_run_running(mqz_job_run_id) self.marquez_log('job_running', json.dumps( {'namespace': self.mqz_namespace, @@ -39,8 +39,7 @@ def create_dagrun(self, *args, **kwargs): 'nominal_end_time': end_time, 'jobrun_id': mqz_job_run_id, 'inputDatasetUrns': self.mqz_input_datasets, - 'outputDatasetUrns': self.mqz_output_datasets, - 'state': str(state.name) + 'outputDatasetUrns': self.mqz_output_datasets })) run = super().create_dagrun(*args, **kwargs) @@ -50,22 +49,23 @@ def create_dagrun(self, *args, **kwargs): def handle_callback(self, *args, **kwargs): job_name = self.dag_id - - if kwargs.get('success'): - state = RunState.COMPLETED - else: - state = RunState.FAILED - - - mqz_job_run_id = self.get_and_delete(args[0].run_id) if mqz_job_run_id: - self.mqz_client.set_jobrun_state(mqz_job_run_id, state) - self.marquez_log('job_state_change', - json.dumps({'job_name': job_name, - 'jobrun_id': mqz_job_run_id, - 'state': str(state.name)})) + + if kwargs.get('success'): + self.mqz_client.mark_job_run_completed(mqz_job_run_id) + self.marquez_log('job_state_change', + json.dumps({'job_name': job_name, + 'jobrun_id': mqz_job_run_id, + 'state': 'COMPLETED'})) + else: + self.mqz_client.mark_job_run_failed(mqz_job_run_id) + self.marquez_log('job_state_change', + json.dumps({'job_name': job_name, + 'jobrun_id': mqz_job_run_id, + 'state': 'FAILED'})) + else: # TODO warn that the jobrun_id couldn't be found pass @@ -90,4 +90,4 @@ def marquez_log(self, event, extras, session=None): owner="marquez", extra=extras, task_id=None, - dag_id=self.dag_id)) \ No newline at end of file + dag_id=self.dag_id)) diff --git a/marquez/client/__init__.py b/marquez/client/__init__.py deleted file mode 100644 index d9be9b3cf6087..0000000000000 --- a/marquez/client/__init__.py +++ /dev/null @@ -1,57 +0,0 @@ -from enum import Enum -from marquez_client import Configuration, CreateNamespace, CreateJob, CreateJobRun, NamespacesApi, ApiClient, JobsApi - -class RunState(Enum): - RUNNING = 1 - COMPLETED = 2 - FAILED = 3 - ABORTED = 4 - - -class MarquezClient: - namespace = None - jobs_api_client = None - namespace_api_client = None - - def __init__(self): - conf = Configuration() - conf.host = 'localhost:5000/api/v1' # TODO make it a set() method - - # create an instance of the API class - api_client = ApiClient(conf) - self.jobs_api_client = JobsApi(api_client) - self.namespace_api_client = NamespacesApi(api_client) - - def set_namespace(self, namespace): - self.namespace = namespace - payload = CreateNamespace('anonymous', '') - return self.namespace_api_client.namespaces_namespace_put(namespace, create_namespace=payload) - - def create_job(self, job_name, location, input_dataset_urns, output_dataset_urns, description): - if self.namespace is None: - raise("You have to set the namespace first.") # TODO change this to be a WARN - - payload = CreateJob(input_dataset_urns, output_dataset_urns, location, description) - return self.jobs_api_client.namespaces_namespace_jobs_job_put(self.namespace, job_name, create_job=payload) - - - def create_job_run(self, job_name, job_run_args, nominal_start_time, nominal_end_time): - if self.namespace is None: - raise("You have to set the namespace first.") # TODO change this to be a WARN - - payload = CreateJobRun(nominal_start_time, nominal_end_time, job_run_args) - return self.jobs_api_client.namespaces_namespace_jobs_job_runs_post( - self.namespace, job_name, create_job_run=payload).run_id - - def set_jobrun_state(self, run_id, state): - if state == RunState.RUNNING: - self.jobs_api_client.jobs_runs_id_run_put(run_id) - elif state == RunState.COMPLETED: - self.jobs_api_client.jobs_runs_id_complete_put(run_id) - elif state == RunState.FAILED: - self.jobs_api_client.jobs_runs_id_fail_put(run_id) - elif state == RunState.ABORTED: - self.jobs_api_client.jobs_runs_id_abort_put(run_id) - else: - # TODO WARN invalid state - pass