Skip to content

Commit

Permalink
Use write only client (apache#53)
Browse files Browse the repository at this point in the history
* Bump marquez-python to 0.7.0

Signed-off-by: wslulciuc <[email protected]>

* Add collect operator type

Signed-off-by: wslulciuc <[email protected]>

* Remove unused class

Signed-off-by: wslulciuc <[email protected]>

* Add write only client

Signed-off-by: wslulciuc <[email protected]>

* continued:  Add write only client

Signed-off-by: wslulciuc <[email protected]>

* Fix test deps

Signed-off-by: wslulciuc <[email protected]>

* Use client JobType

Signed-off-by: wslulciuc <[email protected]>

* Remove unused file

Signed-off-by: wslulciuc <[email protected]>

* continued: Add write only client

Signed-off-by: wslulciuc <[email protected]>

* Add external run id

Signed-off-by: wslulciuc <[email protected]>

* Pass external run id as string

Signed-off-by: wslulciuc <[email protected]>

* Capture obj members

Signed-off-by: wslulciuc <[email protected]>

* Fix formatting

Signed-off-by: wslulciuc <[email protected]>

* Bump marquez-python to 0.7.1

Signed-off-by: wslulciuc <[email protected]>

* Update task info key in context

Signed-off-by: wslulciuc <[email protected]>

* Remove sample log file

Signed-off-by: wslulciuc <[email protected]>

* Remove pyrfc3339 dep

Signed-off-by: wslulciuc <[email protected]>

* Use airflow.* prefix in context

Signed-off-by: wslulciuc <[email protected]>

* Bump marquez-python to 0.7.2

Signed-off-by: wslulciuc <[email protected]>

* Use task.__dict__ to set airflow.task_info

Signed-off-by: wslulciuc <[email protected]>

* Remove unused import

Signed-off-by: wslulciuc <[email protected]>
  • Loading branch information
wslulciuc authored Sep 2, 2020
1 parent 009aabc commit 7b7b0b1
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 14 deletions.
34 changes: 23 additions & 11 deletions marquez_airflow/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@
import json
import os
import time
from uuid import uuid4

from marquez_client import MarquezClient
from marquez_client.clients import Clients
from marquez_client.models import JobType

import airflow.models
from marquez_airflow import log
Expand Down Expand Up @@ -174,7 +176,12 @@ def report_task(self,
# If no extractor found or failed to extract metadata,
# report the task metadata
if not steps_metadata:
steps_metadata = [StepMetadata(task_name)]
steps_metadata = [StepMetadata(
name=task_name,
context={
'airflow.operator': task.__class__.__name__,
'airflow.task_info': task.__dict__
})]

# store all the JobRuns associated with a task
marquez_jobrun_ids = []
Expand Down Expand Up @@ -205,7 +212,7 @@ def report_task(self,
marquez_namespace=self.marquez_namespace)

marquez_client.create_job(job_name=step.name,
job_type='BATCH', # job type
job_type=JobType.BATCH, # job type
location=(step.location or
task_location),
input_dataset=input_datasets,
Expand All @@ -217,15 +224,20 @@ def report_task(self,
airflow_dag_id=self.dag_id,
marquez_namespace=self.marquez_namespace)

marquez_jobrun_id = marquez_client.create_job_run(
step.name,
# TODO: Look into generating a uuid based on the DAG run_id
external_run_id = str(uuid4())

marquez_client.create_job_run(
namespace_name=self.marquez_namespace,
job_name=step.name,
run_id=external_run_id,
run_args=run_args,
nominal_start_time=start_time,
nominal_end_time=end_time).get('runId')
nominal_end_time=end_time)

if marquez_jobrun_id:
marquez_jobrun_ids.append(marquez_jobrun_id)
marquez_client.mark_job_run_as_started(marquez_jobrun_id)
if external_run_id:
marquez_jobrun_ids.append(external_run_id)
marquez_client.mark_job_run_as_started(external_run_id)
else:
log.error(f'Failed to get run id: {step.name}',
airflow_dag_id=self.dag_id,
Expand All @@ -234,7 +246,7 @@ def report_task(self,
log.info(f'Successfully recorded job run: {step.name}',
airflow_dag_id=self.dag_id,
airflow_dag_execution_time=start_time,
marquez_run_id=marquez_jobrun_id,
marquez_run_id=external_run_id,
marquez_namespace=self.marquez_namespace,
duration_ms=(self._now_ms() - report_job_start_ms))

Expand Down Expand Up @@ -288,7 +300,7 @@ def report_jobrun_change(self, job_name, run_id, **kwargs):

def get_marquez_client(self):
if not self._marquez_client:
self._marquez_client = MarquezClient()
self._marquez_client = Clients.new_write_only_client()
return self._marquez_client

@staticmethod
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
url="https://github.com/MarquezProject/marquez-airflow",
packages=setuptools.find_packages(),
install_requires=[
"marquez-python==0.6.0",
"marquez-python==0.7.2",
"sqlparse==0.3.1"
],
)
3 changes: 1 addition & 2 deletions test-requirements.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
apache-airflow==1.10.11
codecov==2.1.8
marquez-python==0.6.0
marquez-python==0.7.2
pytest==6.0.1
pytest-cov==2.10.1
requests==2.24.0

0 comments on commit 7b7b0b1

Please sign in to comment.