Skip to content

Commit

Permalink
Feature/update client version 0 3 0 (apache#21)
Browse files Browse the repository at this point in the history
* Adding new version of marquez-python and the resulting api changes in the test and integration

* Remove coverage from being checked in and adjust gitignore

* Remove requests from dependencies
  • Loading branch information
ashulmanWeWork authored Apr 25, 2019
1 parent 6b01aa3 commit 2d45b5c
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 18 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
.coverage
*.iml
*.jar
*.swp
Expand All @@ -13,5 +14,6 @@
./**/*.class
bin/
build/
coverage.xml
dist/
__pycache__
17 changes: 9 additions & 8 deletions marquez_airflow/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
import os
import pendulum
import airflow.models
from marquez_client.marquez import MarquezClient

from marquez_client.client import Client
from marquez_airflow.utils import JobIdMapping


Expand Down Expand Up @@ -67,19 +67,19 @@ def report_jobrun(self, run_args, execution_date):
start_time = DAG.to_airflow_time(execution_date)
end_time = self.compute_endtime(execution_date)
marquez_client = self.get_marquez_client()
marquez_client.set_namespace(self.marquez_namespace)

marquez_client.create_job(
job_name, self.marquez_location,
self.marquez_input_urns, self.marquez_output_urns,
self.description)
description=self.description)
marquez_jobrun = marquez_client.create_job_run(
job_name, job_run_args=job_run_args,
nominal_start_time=start_time,
nominal_end_time=end_time)

marquez_jobrun_id = str(marquez_jobrun.run_id)

marquez_client.mark_job_run_running(marquez_jobrun_id)
marquez_client.mark_job_run_as_running(marquez_jobrun_id)
self.log_marquez_event(['job_running',
marquez_jobrun_id,
start_time])
Expand All @@ -101,10 +101,10 @@ def report_jobrun_change(self, dagrun, **kwargs):
JobIdMapping.make_key(dagrun.dag_id, dagrun.run_id), session)
if marquez_job_run_id:
if kwargs.get('success'):
self.get_marquez_client().mark_job_run_completed(
self.get_marquez_client().mark_job_run_as_completed(
marquez_job_run_id)
else:
self.get_marquez_client().mark_job_run_failed(
self.get_marquez_client().mark_job_run_as_failed(
marquez_job_run_id)
state = 'COMPLETED' if kwargs.get('success') else 'FAILED'
self.log_marquez_event(['job_state_change',
Expand All @@ -119,5 +119,6 @@ def log_marquez_event(self, args):

def get_marquez_client(self):
if not self._marquez_client:
self._marquez_client = MarquezClient()
self._marquez_client = Client(
namespace_name=self.marquez_namespace)
return self._marquez_client
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,6 @@
url="https://github.com/MarquezProject/marquez-airflow",
packages=setuptools.find_packages(),
install_requires=[
"marquez-python==0.1.11",
"marquez-python==0.3.0"
],
)
1 change: 1 addition & 0 deletions test-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ apache-airflow==1.10.2
codecov
pytest
pytest-cov
requests
16 changes: 7 additions & 9 deletions test/test_dag_extension.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
from airflow.utils.state import State
from contextlib import contextmanager
from datetime import datetime
from marquez_client.client import Client
from marquez_airflow import DAG
from marquez_client.marquez import MarquezClient
from unittest.mock import Mock, create_autospec, patch

import airflow.models
Expand Down Expand Up @@ -125,23 +125,21 @@ def test_default_namespace():
def assert_marquez_calls_for_dagrun(test_dag):
marquez_client = test_dag.marquez_dag._marquez_client

marquez_client.set_namespace.assert_called_with(
test_dag.marquez_dag.marquez_namespace)

marquez_client.create_job.assert_called_once_with(
test_dag.dag_id, test_dag.location, test_dag.input_urns,
test_dag.output_urns, test_dag.description)
test_dag.output_urns, description=test_dag.description)

marquez_client.create_job_run.assert_called_once_with(
test_dag.dag_id, "{}",
DAG.to_airflow_time(test_dag.start_date),
test_dag.marquez_dag.compute_endtime(test_dag.start_date))
test_dag.dag_id, job_run_args="{}",
nominal_start_time=DAG.to_airflow_time(test_dag.start_date),
nominal_end_time=test_dag.marquez_dag.compute_endtime(
test_dag.start_date))


def make_mock_marquez_client(run_id):
mock_marquez_jobrun = Mock()
mock_marquez_jobrun.run_id = run_id
mock_marquez_client = create_autospec(MarquezClient)
mock_marquez_client = create_autospec(Client)
mock_marquez_client.create_job_run.return_value = mock_marquez_jobrun
return mock_marquez_client

Expand Down

0 comments on commit 2d45b5c

Please sign in to comment.