Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AirbyteHook add cancel job option #24593

Merged
merged 13 commits into from
Jun 29, 2022
12 changes: 12 additions & 0 deletions airflow/providers/airbyte/hooks/airbyte.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,18 @@ def get_job(self, job_id: int) -> Any:
headers={"accept": "application/json"},
)


def cancel_job(self, job_id: int) -> Any:
"""
Cancel the job
:param job_id: Required. Id of the Airbyte job
"""
return self.run(
endpoint=f"api/{self.api_version}/jobs/cancel",
json={"id": job_id},
headers={"accept": "application/json"},
)

def test_connection(self):
"""Tests the Airbyte connection by hitting the health API"""
self.method = 'GET'
Expand Down
25 changes: 17 additions & 8 deletions airflow/providers/airbyte/operators/airbyte.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,23 @@ def __init__(

def execute(self, context: 'Context') -> None:
"""Create Airbyte Job and wait to finish"""
hook = AirbyteHook(airbyte_conn_id=self.airbyte_conn_id, api_version=self.api_version)
job_object = hook.submit_sync_connection(connection_id=self.connection_id)
job_id = job_object.json()['job']['id']
self.hook = AirbyteHook(airbyte_conn_id=self.airbyte_conn_id, api_version=self.api_version)
job_object = self.hook.submit_sync_connection(connection_id=self.connection_id)
self.job_id = job_object.json()['job']['id']

self.log.info("Job %s was submitted to Airbyte Server", job_id)
self.log.info("Job %s was submitted to Airbyte Server", self.job_id)
if not self.asynchronous:
self.log.info('Waiting for job %s to complete', job_id)
hook.wait_for_job(job_id=job_id, wait_seconds=self.wait_seconds, timeout=self.timeout)
self.log.info('Job %s completed successfully', job_id)
self.log.info('Waiting for job %s to complete', self.job_id)
self.hook.wait_for_job(job_id=self.job_id, wait_seconds=self.wait_seconds, timeout=self.timeout)
self.log.info('Job %s completed successfully', self.job_id)

return job_id
return self.job_id


def on_kill(self):
"""
Cancel the job if task is cancelled
"""
if (self.job_id):
self.log.info('on_kill: cancel the airbyte Job %s', self.job_id)
self.hook.cancel_job(self.job_id)
9 changes: 9 additions & 0 deletions tests/providers/airbyte/hooks/test_airbyte.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,12 @@ class TestAirbyteHook(unittest.TestCase):
job_id = 1
sync_connection_endpoint = 'http://test-airbyte:8001/api/v1/connections/sync'
get_job_endpoint = 'http://test-airbyte:8001/api/v1/jobs/get'
cancel_job_endpoint = 'http://test-airbyte:8001/api/v1/jobs/cancel'

health_endpoint = 'http://test-airbyte:8001/api/v1/health'
_mock_sync_conn_success_response_body = {'job': {'id': 1}}
_mock_job_status_success_response_body = {'job': {'status': 'succeeded'}}
_mock_job_cancel_status='cancelled'

def setUp(self):
db.merge_conn(
Expand Down Expand Up @@ -71,6 +74,12 @@ def test_get_job_status(self, m):
assert resp.status_code == 200
assert resp.json() == self._mock_job_status_success_response_body

@requests_mock.mock()
def test_cancel_job(self, m):
m.post(self.cancel_job_endpoint, status_code=200, json=self._mock_job_status_success_response_body)
resp = self.hook.cancel_job(job_id=self.job_id)
assert resp.status_code == 200

@mock.patch('airflow.providers.airbyte.hooks.airbyte.AirbyteHook.get_job')
def test_wait_for_job_succeeded(self, mock_get_job):
mock_get_job.side_effect = [self.return_value_get_job(self.hook.SUCCEEDED)]
Expand Down
17 changes: 17 additions & 0 deletions tests/providers/airbyte/operators/test_airbyte.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,20 @@ def test_execute(self, mock_wait_for_job, mock_submit_sync_connection):
mock_wait_for_job.assert_called_once_with(
job_id=self.job_id, wait_seconds=self.wait_seconds, timeout=self.timeout
)

@mock.patch('airflow.providers.airbyte.hooks.airbyte.AirbyteHook.submit_sync_connection')
@mock.patch('airflow.providers.airbyte.hooks.airbyte.AirbyteHook.cancel_job', return_value=None)
def test_on_kill(self, mock_cancel_job,mock_submit_sync_connection):
mock_submit_sync_connection.return_value = mock.Mock(
**{'json.return_value': {'job': {'id': self.job_id}}}
)
op = AirbyteTriggerSyncOperator(
task_id='test_Airbyte_cancel',
airbyte_conn_id=self.airbyte_conn_id,
connection_id=self.connection_id,
wait_seconds=self.wait_seconds,
timeout=self.timeout,
)
op.execute({})
op.on_kill()
mock_cancel_job.assert_called_once_with(job_id=self.job_id)