Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix part of Google system tests #18494

Merged
merged 18 commits into from
Sep 29, 2021
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@

# [START howto_operator_create_build_from_repo_body]
create_build_from_repo_body = {
"source": {"repo_source": {"repo_name": GCP_SOURCE_REPOSITORY_NAME, "branch_name": "master"}},
"source": {"repo_source": {"repo_name": GCP_SOURCE_REPOSITORY_NAME, "branch_name": "main"}},
"steps": [
{
"name": "gcr.io/cloud-builders/docker",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
"""

import os
import random
from urllib.parse import urlsplit

from airflow import models
Expand All @@ -48,8 +49,8 @@
from airflow.utils.dates import days_ago

GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
INSTANCE_NAME = os.environ.get('GCSQL_MYSQL_INSTANCE_NAME', 'test-mysql')
INSTANCE_NAME2 = os.environ.get('GCSQL_MYSQL_INSTANCE_NAME2', 'test-mysql2')
INSTANCE_NAME = os.environ.get('GCSQL_MYSQL_INSTANCE_NAME', 'test-mysql') + str(random.getrandbits(16))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The unique value should be generated by the test run script, because the DAG file is loaded multiple times by Airflow, so we have to make sure that it had the same structure each time.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @mik-laj - we use this value at least two times and it will be different every time.

Copy link
Contributor Author

@mnojek mnojek Sep 27, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added this because the instance name is unique only for the specific Airflow container so each time I run the test within 1 Airflow run, it's the same. The problem was that if this system test failed before deleting the instance, it couldn't be run again because it wanted to create instance with the same instance while it was still present. That's why I added it. So it works only when it passes within the first execution.
That is a simple workaround to make each test execution independent but overall it's not the desired solution. Can you point me where else this value is used?

Ideally, I would want to have independent tests and also a mechanism that will cleanup all the values created by the test if it fails in the middle. This is not the scope of this PR, but in near future I plan to work on that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. This is HIGHLY annoying with cloudsql that name cannot ber reused for a day or so even if db is deleted.

However I agree random POSTFIX generation in the DAG is a bad idea.

What we used to have in the past is that we had variables.env file in files/airflow-breeze-config where we sourced files with variables and we had a very simple script that generated the random postfix if it was missing.

Then you could do step-by-step testing with keeping the randomly generated postfix even across breeze restarts.
When you needed to change the database name you'd simply remove the file and it would be re-generated automatically at breeze entry.

https://github.com/apache/airflow/blob/main/BREEZE.rst#customize-your-environment

Something like that might work (writing it from memory so I am not sure if it is correct) in variables.env:

if [[ ! -f /files/random.env ]]; then 
   echo "export RANDOM_POSTFIX=${RANDOM}" > /files/random.env
fi
source /files/random.env

export GCSQL_MYSQL_INSTANCE_NAME="test-mysql-${RANDOM_POSTFIX}"

This has several nice properties:

  • everyone has its own random value
  • you keep it stable between runs or even between debug sessions - for example you could ran tasks from the example DAG separately one-by-one
  • you can very easily regenerate the number by simply deleting the /files/random.env

In the past we even shared the whole airflow-breeze-config directory was actually checked out separate repository where we kept all variables used by the team. This way you could share different variables between same users who have access to the same repo - at the same time each of the users will have different postifx as the random.env would not be part of the repo.

Just an inspiration if you would like to optimize your development workflow.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mnojek Airflow is a distributed application, which means that one DAG file is loaded multiple times on different nodes, so we have to make sure that this instance name has the same value on all nodes. These examples are used in system tests, where this condition is not necessary because we have a common memory, but these examples are also the inspiration for novice users who can use another executor e.g. CeleryExecutor, so each DAG will be loaded on each node separately.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is an example project that we used to run system tests for Google Cloud on Google Cloud Build:

https://github.com/politools/airflow-system-tests

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestions, @potiuk @mik-laj! I will not implement them now but I will definitely use your comments when designing the new approach for running system tests. Hopefully we can make it better and more reliable 😃
In the meantime I will remove this change and hopefully this will not be the case in the new approach.

INSTANCE_NAME2 = os.environ.get('GCSQL_MYSQL_INSTANCE_NAME2', 'test-mysql2') + str(random.getrandbits(16))
DB_NAME = os.environ.get('GCSQL_MYSQL_DATABASE_NAME', 'testdb')

EXPORT_URI = os.environ.get('GCSQL_MYSQL_EXPORT_URI', 'gs://INVALID BUCKET NAME/fileName')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
from airflow.utils.dates import days_ago

GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
GCP_REGION = os.environ.get('GCP_REGION', 'europe-west-1b')
GCP_REGION = os.environ.get('GCP_REGION', 'europe-west1')

GCSQL_POSTGRES_INSTANCE_NAME_QUERY = os.environ.get('GCSQL_POSTGRES_INSTANCE_NAME_QUERY', 'testpostgres')
GCSQL_POSTGRES_DATABASE_NAME = os.environ.get('GCSQL_POSTGRES_DATABASE_NAME', 'postgresdb')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
"""
Example Airflow DAG that interacts with Google Data Catalog service
"""
import os

from google.cloud.datacatalog_v1beta1 import FieldType, TagField, TagTemplateField

from airflow import models
Expand Down Expand Up @@ -49,7 +51,8 @@
)
from airflow.utils.dates import days_ago

PROJECT_ID = "polidea-airflow"
PROJECT_ID = os.getenv("GCP_PROJECT_ID")
BUCKET_ID = os.getenv("GCP_TEST_DATA_BUCKET", "INVALID BUCKET NAME")
LOCATION = "us-central1"
ENTRY_GROUP_ID = "important_data_jan_2019"
ENTRY_ID = "python_files"
Expand Down Expand Up @@ -92,7 +95,7 @@
entry={
"display_name": "Wizard",
"type_": "FILESET",
"gcs_fileset_spec": {"file_patterns": ["gs://INVALID BUCKET NAME/**"]},
"gcs_fileset_spec": {"file_patterns": [f"gs://{BUCKET_ID}/**"]},
},
)
# [END howto_operator_gcp_datacatalog_create_entry_gcs]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@
# [START howto_sensor_wait_for_job_status]
wait_for_python_job_async_done = DataflowJobStatusSensor(
task_id="wait-for-python-job-async-done",
job_id="{{task_instance.xcom_pull('start-python-job-async')['job_id']}}",
job_id="{{task_instance.xcom_pull('start-python-job-async')['dataflow_job_id']}}",
expected_statuses={DataflowJobStatus.JOB_STATE_DONE},
location='europe-west3',
)
Expand All @@ -191,7 +191,7 @@ def callback(metrics: List[Dict]) -> bool:

wait_for_python_job_async_metric = DataflowJobMetricsSensor(
task_id="wait-for-python-job-async-metric",
job_id="{{task_instance.xcom_pull('start-python-job-async')['job_id']}}",
job_id="{{task_instance.xcom_pull('start-python-job-async')['dataflow_job_id']}}",
location='europe-west3',
callback=check_metric_scalar_gte(metric_name="Service-cpu_num_seconds", value=100),
)
Expand All @@ -207,7 +207,7 @@ def check_message(messages: List[dict]) -> bool:

wait_for_python_job_async_message = DataflowJobMessagesSensor(
task_id="wait-for-python-job-async-message",
job_id="{{task_instance.xcom_pull('start-python-job-async')['job_id']}}",
job_id="{{task_instance.xcom_pull('start-python-job-async')['dataflow_job_id']}}",
location='europe-west3',
callback=check_message,
)
Expand All @@ -223,7 +223,7 @@ def check_autoscaling_event(autoscaling_events: List[dict]) -> bool:

wait_for_python_job_async_autoscaling_event = DataflowJobAutoScalingEventsSensor(
task_id="wait-for-python-job-async-autoscaling-event",
job_id="{{task_instance.xcom_pull('start-python-job-async')['job_id']}}",
job_id="{{task_instance.xcom_pull('start-python-job-async')['dataflow_job_id']}}",
location='europe-west3',
callback=check_autoscaling_event,
)
Expand Down
20 changes: 13 additions & 7 deletions airflow/providers/google/cloud/example_dags/example_datafusion.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,21 +39,26 @@
from airflow.utils.state import State

# [START howto_data_fusion_env_variables]
SERVICE_ACCOUNT = os.environ.get("GCP_DATAFUSION_SERVICE_ACCOUNT")
LOCATION = "europe-north1"
INSTANCE_NAME = "airflow-test-instance"
INSTANCE = {"type": "BASIC", "displayName": INSTANCE_NAME}
INSTANCE = {
"type": "BASIC",
"displayName": INSTANCE_NAME,
"dataprocServiceAccount": SERVICE_ACCOUNT,
}

BUCKET_1 = os.environ.get("GCP_DATAFUSION_BUCKET_1", "test-datafusion-bucket-1")
BUCKET_2 = os.environ.get("GCP_DATAFUSION_BUCKET_2", "test-datafusion-bucket-2")

BUCKET_1_URI = f"gs//{BUCKET_1}"
BUCKET_2_URI = f"gs//{BUCKET_2}"
BUCKET_1_URI = f"gs://{BUCKET_1}"
BUCKET_2_URI = f"gs://{BUCKET_2}"

PIPELINE_NAME = os.environ.get("GCP_DATAFUSION_PIPELINE_NAME", "airflow_test")
PIPELINE = {
"name": "test-pipe",
"description": "Data Pipeline Application",
"artifact": {"name": "cdap-data-pipeline", "version": "6.1.2", "scope": "SYSTEM"},
"artifact": {"name": "cdap-data-pipeline", "version": "6.4.1", "scope": "SYSTEM"},
"config": {
"resources": {"memoryMB": 2048, "virtualCores": 1},
"driverResources": {"memoryMB": 2048, "virtualCores": 1},
Expand All @@ -72,7 +77,7 @@
"label": "GCS",
"artifact": {
"name": "google-cloud",
"version": "0.14.2",
"version": "0.17.3",
"scope": "SYSTEM",
},
"properties": {
Expand Down Expand Up @@ -105,7 +110,7 @@
"label": "GCS2",
"artifact": {
"name": "google-cloud",
"version": "0.14.2",
"version": "0.17.3",
"scope": "SYSTEM",
},
"properties": {
Expand Down Expand Up @@ -176,7 +181,7 @@
location=LOCATION,
instance_name=INSTANCE_NAME,
instance=INSTANCE,
update_mask="instance.displayName",
update_mask="",
task_id="update_instance",
)
# [END howto_cloud_data_fusion_update_instance_operator]
Expand Down Expand Up @@ -223,6 +228,7 @@
pipeline_name=PIPELINE_NAME,
pipeline_id=start_pipeline_async.output,
expected_statuses=["COMPLETED"],
failure_statuses=["FAILED"],
instance_name=INSTANCE_NAME,
location=LOCATION,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@
)
GCF_ZIP_PATH = os.environ.get('GCF_ZIP_PATH', '')
GCF_ENTRYPOINT = os.environ.get('GCF_ENTRYPOINT', 'helloWorld')
GCF_RUNTIME = 'nodejs6'
GCF_RUNTIME = 'nodejs14'
GCP_VALIDATE_BODY = os.environ.get('GCP_VALIDATE_BODY', "True") == "True"

# [START howto_operator_gcf_deploy_body]
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/google/cloud/hooks/datafusion.py
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ def create_pipeline(
url = os.path.join(self._base_url(instance_url, namespace), quote(pipeline_name))
response = self._cdap_request(url=url, method="PUT", body=pipeline)
if response.status != 200:
raise AirflowException(f"Creating a pipeline failed with code {response.status}")
raise AirflowException(f"Creating a pipeline failed with code {response.status} while calling {url}")

def delete_pipeline(
self,
Expand Down
8 changes: 8 additions & 0 deletions airflow/providers/google/cloud/sensors/datafusion.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ class CloudDataFusionPipelineStateSensor(BaseSensorOperator):
:type pipeline_name: str
:param expected_statuses: State that is expected
:type expected_statuses: set[str]
:param failure_statuses: State that will terminate the sensor with an exception
:type failure_statuses: set[str]
:param instance_name: The name of the instance.
:type instance_name: str
:param location: The Cloud Data Fusion location in which to handle the request.
Expand Down Expand Up @@ -68,6 +70,7 @@ def __init__(
pipeline_name: str,
pipeline_id: str,
expected_statuses: Set[str],
failure_statuses: Set[str],
instance_name: str,
location: str,
project_id: Optional[str] = None,
Expand All @@ -81,6 +84,7 @@ def __init__(
self.pipeline_name = pipeline_name
self.pipeline_id = pipeline_id
self.expected_statuses = expected_statuses
self.failure_statuses = failure_statuses
self.instance_name = instance_name
self.location = location
self.project_id = project_id
Expand Down Expand Up @@ -119,6 +123,10 @@ def poke(self, context: dict) -> bool:
except AirflowException:
pass # Because the pipeline may not be visible in system yet

if pipeline_status in self.failure_statuses:
raise AirflowException(f"Pipeline with id '{self.pipeline_id}' state is: {pipeline_status}. "
f"Terminating sensor...")

self.log.debug(
"Current status of the pipeline workflow for %s: %s.", self.pipeline_id, pipeline_status
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def create_repository_and_bucket(self):
GCP_PROJECT_ID, GCP_REPOSITORY_NAME
)
self.execute_cmd(["git", "remote", "add", "origin", repo_url], cwd=tmp_dir)
self.execute_cmd(["git", "push", "--force", "origin", "main"], cwd=tmp_dir)
self.execute_cmd(["git", "push", "--force", "origin", "master"], cwd=tmp_dir)

def delete_repo(self):
"""Delete repository in Google Cloud Source Repository service"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@
from tests.test_utils.db import clear_db_connections
from tests.test_utils.gcp_system_helpers import CLOUD_DAG_FOLDER, GoogleSystemTest

TOKEN = environ.get("DATAPREP_TOKEN", "dataprep-system-test-token")
TOKEN = environ.get("DATAPREP_TOKEN")
EXTRA = {"extra__dataprep__token": TOKEN}


@pytest.mark.skipif(environ.get("DATAPREP_TOKEN") is None, reason='Dataprep token not present')
@pytest.mark.skipif(TOKEN is None, reason='Dataprep token not present')
class DataprepExampleDagsTest(GoogleSystemTest):
"""
System tests for Dataprep operators.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,12 @@ def create_test_file():

@staticmethod
def remove_test_files():
os.remove(PATH_TO_UPLOAD_FILE)
os.remove(PATH_TO_SAVED_FILE)
os.remove(PATH_TO_TRANSFORM_SCRIPT)
if os.path.exists(PATH_TO_UPLOAD_FILE):
os.remove(PATH_TO_UPLOAD_FILE)
if os.path.exists(PATH_TO_SAVED_FILE):
os.remove(PATH_TO_SAVED_FILE)
if os.path.exists(PATH_TO_TRANSFORM_SCRIPT):
os.remove(PATH_TO_TRANSFORM_SCRIPT)

def remove_bucket(self):
self.execute_cmd(["gsutil", "rm", "-r", f"gs://{BUCKET_1}"])
Expand Down