Skip to content

Commit

Permalink
Add unit test for datafusion
Browse files Browse the repository at this point in the history
  • Loading branch information
mnojek committed Sep 27, 2021
1 parent e5830ed commit 31bd140
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 3 deletions.
4 changes: 3 additions & 1 deletion airflow/providers/google/cloud/hooks/datafusion.py
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,9 @@ def create_pipeline(
url = os.path.join(self._base_url(instance_url, namespace), quote(pipeline_name))
response = self._cdap_request(url=url, method="PUT", body=pipeline)
if response.status != 200:
raise AirflowException(f"Creating a pipeline failed with code {response.status} while calling {url}")
raise AirflowException(
f"Creating a pipeline failed with code {response.status} while calling {url}"
)

def delete_pipeline(
self,
Expand Down
6 changes: 4 additions & 2 deletions airflow/providers/google/cloud/sensors/datafusion.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,10 @@ def poke(self, context: dict) -> bool:
pass # Because the pipeline may not be visible in system yet

if pipeline_status in self.failure_statuses:
raise AirflowException(f"Pipeline with id '{self.pipeline_id}' state is: {pipeline_status}. "
f"Terminating sensor...")
raise AirflowException(
f"Pipeline with id '{self.pipeline_id}' state is: {pipeline_status}. "
f"Terminating sensor..."
)

self.log.debug(
"Current status of the pipeline workflow for %s: %s.", self.pipeline_id, pipeline_status
Expand Down
3 changes: 3 additions & 0 deletions tests/providers/google/cloud/sensors/test_datafusion.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,15 @@
GCP_CONN_ID = "test_conn_id"
DELEGATE_TO = "test_delegate_to"
IMPERSONATION_CHAIN = ["ACCOUNT_1", "ACCOUNT_2", "ACCOUNT_3"]
FAILURE_STATUSES = {"FAILED"}


class TestCloudDataFusionPipelineStateSensor(unittest.TestCase):
@parameterized.expand(
[
(PipelineStates.COMPLETED, PipelineStates.COMPLETED, True),
(PipelineStates.COMPLETED, PipelineStates.RUNNING, False),
(PipelineStates.COMPLETED, PipelineStates.FAILED, False),
]
)
@mock.patch("airflow.providers.google.cloud.sensors.datafusion.DataFusionHook")
Expand All @@ -52,6 +54,7 @@ def test_poke(self, expected_status, current_status, sensor_return, mock_hook):
pipeline_id=PIPELINE_ID,
project_id=PROJECT_ID,
expected_statuses=[expected_status],
failure_statuses=FAILURE_STATUSES,
instance_name=INSTANCE_NAME,
location=LOCATION,
gcp_conn_id=GCP_CONN_ID,
Expand Down

0 comments on commit 31bd140

Please sign in to comment.