diff --git a/airflow/providers/google/cloud/example_dags/example_cloud_build.py b/airflow/providers/google/cloud/example_dags/example_cloud_build.py index 7a6d8559c8b86..4908f23d10d14 100644 --- a/airflow/providers/google/cloud/example_dags/example_cloud_build.py +++ b/airflow/providers/google/cloud/example_dags/example_cloud_build.py @@ -97,7 +97,7 @@ # [START howto_operator_create_build_from_repo_body] create_build_from_repo_body = { - "source": {"repo_source": {"repo_name": GCP_SOURCE_REPOSITORY_NAME, "branch_name": "master"}}, + "source": {"repo_source": {"repo_name": GCP_SOURCE_REPOSITORY_NAME, "branch_name": "main"}}, "steps": [ { "name": "gcr.io/cloud-builders/docker", diff --git a/airflow/providers/google/cloud/example_dags/example_cloud_sql_query.py b/airflow/providers/google/cloud/example_dags/example_cloud_sql_query.py index c0c26f568929e..098e8a3c48161 100644 --- a/airflow/providers/google/cloud/example_dags/example_cloud_sql_query.py +++ b/airflow/providers/google/cloud/example_dags/example_cloud_sql_query.py @@ -46,7 +46,7 @@ from airflow.utils.dates import days_ago GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project') -GCP_REGION = os.environ.get('GCP_REGION', 'europe-west-1b') +GCP_REGION = os.environ.get('GCP_REGION', 'europe-west1') GCSQL_POSTGRES_INSTANCE_NAME_QUERY = os.environ.get('GCSQL_POSTGRES_INSTANCE_NAME_QUERY', 'testpostgres') GCSQL_POSTGRES_DATABASE_NAME = os.environ.get('GCSQL_POSTGRES_DATABASE_NAME', 'postgresdb') diff --git a/airflow/providers/google/cloud/example_dags/example_datacatalog.py b/airflow/providers/google/cloud/example_dags/example_datacatalog.py index cd9b66d447c9f..e56bc10546f0f 100644 --- a/airflow/providers/google/cloud/example_dags/example_datacatalog.py +++ b/airflow/providers/google/cloud/example_dags/example_datacatalog.py @@ -19,6 +19,8 @@ """ Example Airflow DAG that interacts with Google Data Catalog service """ +import os + from google.cloud.datacatalog_v1beta1 import FieldType, TagField, TagTemplateField from airflow import models @@ -49,7 +51,8 @@ ) from airflow.utils.dates import days_ago -PROJECT_ID = "polidea-airflow" +PROJECT_ID = os.getenv("GCP_PROJECT_ID") +BUCKET_ID = os.getenv("GCP_TEST_DATA_BUCKET", "INVALID BUCKET NAME") LOCATION = "us-central1" ENTRY_GROUP_ID = "important_data_jan_2019" ENTRY_ID = "python_files" @@ -92,7 +95,7 @@ entry={ "display_name": "Wizard", "type_": "FILESET", - "gcs_fileset_spec": {"file_patterns": ["gs://INVALID BUCKET NAME/**"]}, + "gcs_fileset_spec": {"file_patterns": [f"gs://{BUCKET_ID}/**"]}, }, ) # [END howto_operator_gcp_datacatalog_create_entry_gcs] diff --git a/airflow/providers/google/cloud/example_dags/example_dataflow.py b/airflow/providers/google/cloud/example_dags/example_dataflow.py index 531ea5ac7ff57..0c9aceb631c2c 100644 --- a/airflow/providers/google/cloud/example_dags/example_dataflow.py +++ b/airflow/providers/google/cloud/example_dags/example_dataflow.py @@ -152,6 +152,7 @@ # [START howto_operator_start_python_job_async] start_python_job_async = BeamRunPythonPipelineOperator( task_id="start-python-job-async", + runner="DataflowRunner", py_file=GCS_PYTHON, py_options=[], pipeline_options={ @@ -160,14 +161,18 @@ py_requirements=['apache-beam[gcp]==2.25.0'], py_interpreter='python3', py_system_site_packages=False, - dataflow_config={"location": 'europe-west3', "wait_until_finished": False}, + dataflow_config={ + "job_name": "start-python-job-async", + "location": 'europe-west3', + "wait_until_finished": False, + }, ) # [END howto_operator_start_python_job_async] # [START howto_sensor_wait_for_job_status] wait_for_python_job_async_done = DataflowJobStatusSensor( task_id="wait-for-python-job-async-done", - job_id="{{task_instance.xcom_pull('start-python-job-async')['job_id']}}", + job_id="{{task_instance.xcom_pull('start-python-job-async')['dataflow_job_id']}}", expected_statuses={DataflowJobStatus.JOB_STATE_DONE}, location='europe-west3', ) @@ -191,9 +196,10 @@ def callback(metrics: List[Dict]) -> bool: wait_for_python_job_async_metric = DataflowJobMetricsSensor( task_id="wait-for-python-job-async-metric", - job_id="{{task_instance.xcom_pull('start-python-job-async')['job_id']}}", + job_id="{{task_instance.xcom_pull('start-python-job-async')['dataflow_job_id']}}", location='europe-west3', callback=check_metric_scalar_gte(metric_name="Service-cpu_num_seconds", value=100), + fail_on_terminal_state=False, ) # [END howto_sensor_wait_for_job_metric] @@ -207,9 +213,10 @@ def check_message(messages: List[dict]) -> bool: wait_for_python_job_async_message = DataflowJobMessagesSensor( task_id="wait-for-python-job-async-message", - job_id="{{task_instance.xcom_pull('start-python-job-async')['job_id']}}", + job_id="{{task_instance.xcom_pull('start-python-job-async')['dataflow_job_id']}}", location='europe-west3', callback=check_message, + fail_on_terminal_state=False, ) # [END howto_sensor_wait_for_job_message] @@ -223,9 +230,10 @@ def check_autoscaling_event(autoscaling_events: List[dict]) -> bool: wait_for_python_job_async_autoscaling_event = DataflowJobAutoScalingEventsSensor( task_id="wait-for-python-job-async-autoscaling-event", - job_id="{{task_instance.xcom_pull('start-python-job-async')['job_id']}}", + job_id="{{task_instance.xcom_pull('start-python-job-async')['dataflow_job_id']}}", location='europe-west3', callback=check_autoscaling_event, + fail_on_terminal_state=False, ) # [END howto_sensor_wait_for_job_autoscaling_event] diff --git a/airflow/providers/google/cloud/example_dags/example_datafusion.py b/airflow/providers/google/cloud/example_dags/example_datafusion.py index 8b5398f429589..4900e6b2cef6c 100644 --- a/airflow/providers/google/cloud/example_dags/example_datafusion.py +++ b/airflow/providers/google/cloud/example_dags/example_datafusion.py @@ -39,21 +39,26 @@ from airflow.utils.state import State # [START howto_data_fusion_env_variables] +SERVICE_ACCOUNT = os.environ.get("GCP_DATAFUSION_SERVICE_ACCOUNT") LOCATION = "europe-north1" INSTANCE_NAME = "airflow-test-instance" -INSTANCE = {"type": "BASIC", "displayName": INSTANCE_NAME} +INSTANCE = { + "type": "BASIC", + "displayName": INSTANCE_NAME, + "dataprocServiceAccount": SERVICE_ACCOUNT, +} BUCKET_1 = os.environ.get("GCP_DATAFUSION_BUCKET_1", "test-datafusion-bucket-1") BUCKET_2 = os.environ.get("GCP_DATAFUSION_BUCKET_2", "test-datafusion-bucket-2") -BUCKET_1_URI = f"gs//{BUCKET_1}" -BUCKET_2_URI = f"gs//{BUCKET_2}" +BUCKET_1_URI = f"gs://{BUCKET_1}" +BUCKET_2_URI = f"gs://{BUCKET_2}" PIPELINE_NAME = os.environ.get("GCP_DATAFUSION_PIPELINE_NAME", "airflow_test") PIPELINE = { "name": "test-pipe", "description": "Data Pipeline Application", - "artifact": {"name": "cdap-data-pipeline", "version": "6.1.2", "scope": "SYSTEM"}, + "artifact": {"name": "cdap-data-pipeline", "version": "6.4.1", "scope": "SYSTEM"}, "config": { "resources": {"memoryMB": 2048, "virtualCores": 1}, "driverResources": {"memoryMB": 2048, "virtualCores": 1}, @@ -72,7 +77,7 @@ "label": "GCS", "artifact": { "name": "google-cloud", - "version": "0.14.2", + "version": "0.17.3", "scope": "SYSTEM", }, "properties": { @@ -105,7 +110,7 @@ "label": "GCS2", "artifact": { "name": "google-cloud", - "version": "0.14.2", + "version": "0.17.3", "scope": "SYSTEM", }, "properties": { @@ -176,7 +181,7 @@ location=LOCATION, instance_name=INSTANCE_NAME, instance=INSTANCE, - update_mask="instance.displayName", + update_mask="", task_id="update_instance", ) # [END howto_cloud_data_fusion_update_instance_operator] @@ -223,6 +228,7 @@ pipeline_name=PIPELINE_NAME, pipeline_id=start_pipeline_async.output, expected_statuses=["COMPLETED"], + failure_statuses=["FAILED"], instance_name=INSTANCE_NAME, location=LOCATION, ) diff --git a/airflow/providers/google/cloud/example_dags/example_functions.py b/airflow/providers/google/cloud/example_dags/example_functions.py index 7c6cdb8ea7410..03749baf7f353 100644 --- a/airflow/providers/google/cloud/example_dags/example_functions.py +++ b/airflow/providers/google/cloud/example_dags/example_functions.py @@ -67,7 +67,7 @@ ) GCF_ZIP_PATH = os.environ.get('GCF_ZIP_PATH', '') GCF_ENTRYPOINT = os.environ.get('GCF_ENTRYPOINT', 'helloWorld') -GCF_RUNTIME = 'nodejs6' +GCF_RUNTIME = 'nodejs14' GCP_VALIDATE_BODY = os.environ.get('GCP_VALIDATE_BODY', "True") == "True" # [START howto_operator_gcf_deploy_body] diff --git a/airflow/providers/google/cloud/hooks/datafusion.py b/airflow/providers/google/cloud/hooks/datafusion.py index fefd5ce6d84ed..b6321dc3f5d8c 100644 --- a/airflow/providers/google/cloud/hooks/datafusion.py +++ b/airflow/providers/google/cloud/hooks/datafusion.py @@ -333,7 +333,9 @@ def create_pipeline( url = os.path.join(self._base_url(instance_url, namespace), quote(pipeline_name)) response = self._cdap_request(url=url, method="PUT", body=pipeline) if response.status != 200: - raise AirflowException(f"Creating a pipeline failed with code {response.status}") + raise AirflowException( + f"Creating a pipeline failed with code {response.status} while calling {url}" + ) def delete_pipeline( self, diff --git a/airflow/providers/google/cloud/sensors/datafusion.py b/airflow/providers/google/cloud/sensors/datafusion.py index c57d3f0c35a03..54c7e4ee94b32 100644 --- a/airflow/providers/google/cloud/sensors/datafusion.py +++ b/airflow/providers/google/cloud/sensors/datafusion.py @@ -33,6 +33,8 @@ class CloudDataFusionPipelineStateSensor(BaseSensorOperator): :type pipeline_name: str :param expected_statuses: State that is expected :type expected_statuses: set[str] + :param failure_statuses: State that will terminate the sensor with an exception + :type failure_statuses: set[str] :param instance_name: The name of the instance. :type instance_name: str :param location: The Cloud Data Fusion location in which to handle the request. @@ -70,6 +72,7 @@ def __init__( expected_statuses: Set[str], instance_name: str, location: str, + failure_statuses: Set[str] = None, project_id: Optional[str] = None, namespace: str = "default", gcp_conn_id: str = 'google_cloud_default', @@ -81,6 +84,7 @@ def __init__( self.pipeline_name = pipeline_name self.pipeline_id = pipeline_id self.expected_statuses = expected_statuses + self.failure_statuses = failure_statuses self.instance_name = instance_name self.location = location self.project_id = project_id @@ -119,6 +123,12 @@ def poke(self, context: dict) -> bool: except AirflowException: pass # Because the pipeline may not be visible in system yet + if self.failure_statuses and pipeline_status in self.failure_statuses: + raise AirflowException( + f"Pipeline with id '{self.pipeline_id}' state is: {pipeline_status}. " + f"Terminating sensor..." + ) + self.log.debug( "Current status of the pipeline workflow for %s: %s.", self.pipeline_id, pipeline_status ) diff --git a/tests/providers/google/cloud/operators/test_cloud_build_system_helper.py b/tests/providers/google/cloud/operators/test_cloud_build_system_helper.py index c70db9b019ced..5a1c8499040e3 100755 --- a/tests/providers/google/cloud/operators/test_cloud_build_system_helper.py +++ b/tests/providers/google/cloud/operators/test_cloud_build_system_helper.py @@ -79,7 +79,7 @@ def create_repository_and_bucket(self): GCP_PROJECT_ID, GCP_REPOSITORY_NAME ) self.execute_cmd(["git", "remote", "add", "origin", repo_url], cwd=tmp_dir) - self.execute_cmd(["git", "push", "--force", "origin", "main"], cwd=tmp_dir) + self.execute_cmd(["git", "push", "--force", "origin", "master"], cwd=tmp_dir) def delete_repo(self): """Delete repository in Google Cloud Source Repository service""" diff --git a/tests/providers/google/cloud/operators/test_dataprep_system.py b/tests/providers/google/cloud/operators/test_dataprep_system.py index e6cd98cd3dca8..3b4c54c7ef55a 100644 --- a/tests/providers/google/cloud/operators/test_dataprep_system.py +++ b/tests/providers/google/cloud/operators/test_dataprep_system.py @@ -25,11 +25,11 @@ from tests.test_utils.db import clear_db_connections from tests.test_utils.gcp_system_helpers import CLOUD_DAG_FOLDER, GoogleSystemTest -TOKEN = environ.get("DATAPREP_TOKEN", "dataprep-system-test-token") +TOKEN = environ.get("DATAPREP_TOKEN") EXTRA = {"extra__dataprep__token": TOKEN} -@pytest.mark.skipif(environ.get("DATAPREP_TOKEN") is None, reason='Dataprep token not present') +@pytest.mark.skipif(TOKEN is None, reason='Dataprep token not present') class DataprepExampleDagsTest(GoogleSystemTest): """ System tests for Dataprep operators. diff --git a/tests/providers/google/cloud/operators/test_gcs_system_helper.py b/tests/providers/google/cloud/operators/test_gcs_system_helper.py index 3d69f62ee8325..a105f7ecc9ba2 100644 --- a/tests/providers/google/cloud/operators/test_gcs_system_helper.py +++ b/tests/providers/google/cloud/operators/test_gcs_system_helper.py @@ -52,9 +52,12 @@ def create_test_file(): @staticmethod def remove_test_files(): - os.remove(PATH_TO_UPLOAD_FILE) - os.remove(PATH_TO_SAVED_FILE) - os.remove(PATH_TO_TRANSFORM_SCRIPT) + if os.path.exists(PATH_TO_UPLOAD_FILE): + os.remove(PATH_TO_UPLOAD_FILE) + if os.path.exists(PATH_TO_SAVED_FILE): + os.remove(PATH_TO_SAVED_FILE) + if os.path.exists(PATH_TO_TRANSFORM_SCRIPT): + os.remove(PATH_TO_TRANSFORM_SCRIPT) def remove_bucket(self): self.execute_cmd(["gsutil", "rm", "-r", f"gs://{BUCKET_1}"]) diff --git a/tests/providers/google/cloud/sensors/test_datafusion.py b/tests/providers/google/cloud/sensors/test_datafusion.py index aeff6a57dbced..e0762b13861a5 100644 --- a/tests/providers/google/cloud/sensors/test_datafusion.py +++ b/tests/providers/google/cloud/sensors/test_datafusion.py @@ -19,8 +19,10 @@ import unittest from unittest import mock +import pytest from parameterized.parameterized import parameterized +from airflow import AirflowException from airflow.providers.google.cloud.hooks.datafusion import PipelineStates from airflow.providers.google.cloud.sensors.datafusion import CloudDataFusionPipelineStateSensor @@ -33,6 +35,7 @@ GCP_CONN_ID = "test_conn_id" DELEGATE_TO = "test_delegate_to" IMPERSONATION_CHAIN = ["ACCOUNT_1", "ACCOUNT_2", "ACCOUNT_3"] +FAILURE_STATUSES = {"FAILED"} class TestCloudDataFusionPipelineStateSensor(unittest.TestCase): @@ -51,7 +54,7 @@ def test_poke(self, expected_status, current_status, sensor_return, mock_hook): pipeline_name=PIPELINE_NAME, pipeline_id=PIPELINE_ID, project_id=PROJECT_ID, - expected_statuses=[expected_status], + expected_statuses={expected_status}, instance_name=INSTANCE_NAME, location=LOCATION, gcp_conn_id=GCP_CONN_ID, @@ -73,3 +76,28 @@ def test_poke(self, expected_status, current_status, sensor_return, mock_hook): mock_hook.return_value.get_instance.assert_called_once_with( instance_name=INSTANCE_NAME, location=LOCATION, project_id=PROJECT_ID ) + + @mock.patch("airflow.providers.google.cloud.sensors.datafusion.DataFusionHook") + def test_assertion(self, mock_hook): + mock_hook.return_value.get_instance.return_value = {"apiEndpoint": INSTANCE_URL} + + task = CloudDataFusionPipelineStateSensor( + task_id="test_task_id", + pipeline_name=PIPELINE_NAME, + pipeline_id=PIPELINE_ID, + project_id=PROJECT_ID, + expected_statuses={PipelineStates.COMPLETED}, + failure_statuses=FAILURE_STATUSES, + instance_name=INSTANCE_NAME, + location=LOCATION, + gcp_conn_id=GCP_CONN_ID, + delegate_to=DELEGATE_TO, + impersonation_chain=IMPERSONATION_CHAIN, + ) + + with pytest.raises( + AirflowException, + match=f"Pipeline with id '{PIPELINE_ID}' state is: FAILED. Terminating sensor...", + ): + mock_hook.return_value.get_pipeline_workflow.return_value = {"status": 'FAILED'} + task.poke(mock.MagicMock())