Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix template rendering for Common SQL operators #28202

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 18 additions & 10 deletions airflow/providers/common/sql/operators/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,8 @@ class SQLColumnCheckOperator(BaseSQLOperator):
:ref:`howto/operator:SQLColumnCheckOperator`
"""

template_fields = ("partition_clause",)
template_fields = ("partition_clause", "table", "sql")
template_fields_renderers = {"sql": "sql"}

sql_check_template = """
SELECT '{column}' AS col_name, '{check}' AS check_type, {column}_{check} AS check_result
Expand Down Expand Up @@ -550,7 +551,9 @@ class SQLTableCheckOperator(BaseSQLOperator):
:ref:`howto/operator:SQLTableCheckOperator`
"""

template_fields = ("partition_clause",)
template_fields = ("partition_clause", "table", "sql")

template_fields_renderers = {"sql": "sql"}

sql_check_template = """
SELECT '{check_name}' AS check_name, MIN({check_name}) AS check_result
Expand Down Expand Up @@ -603,6 +606,8 @@ def execute(self, context: Context):
self.log.info("All tests have passed")

def _generate_sql_query(self):
self.log.info("Partition clause: %s", self.partition_clause)

def _generate_partition_clause(check_name):
if self.partition_clause and "partition_clause" not in self.checks[check_name]:
return f"WHERE {self.partition_clause}"
Expand Down Expand Up @@ -953,24 +958,27 @@ def __init__(
):
super().__init__(conn_id=conn_id, database=database, **kwargs)
self.sql = sql
self.min_threshold = _convert_to_float_if_possible(min_threshold)
self.max_threshold = _convert_to_float_if_possible(max_threshold)
self.min_threshold = min_threshold
self.max_threshold = max_threshold

def execute(self, context: Context):
hook = self.get_db_hook()
result = hook.get_first(self.sql)[0]
if not result:
self._raise_exception(f"The following query returned zero rows: {self.sql}")

if isinstance(self.min_threshold, float):
lower_bound = self.min_threshold
min_threshold = _convert_to_float_if_possible(self.min_threshold)
max_threshold = _convert_to_float_if_possible(self.max_threshold)

if isinstance(min_threshold, float):
lower_bound = min_threshold
else:
lower_bound = hook.get_first(self.min_threshold)[0]
lower_bound = hook.get_first(min_threshold)[0]

if isinstance(self.max_threshold, float):
upper_bound = self.max_threshold
if isinstance(max_threshold, float):
upper_bound = max_threshold
else:
upper_bound = hook.get_first(self.max_threshold)[0]
upper_bound = hook.get_first(max_threshold)[0]

meta_data = {
"result": result,
Expand Down
2 changes: 2 additions & 0 deletions airflow/providers/common/sql/operators/sql.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ class SQLExecuteQueryOperator(BaseSQLOperator):

class SQLColumnCheckOperator(BaseSQLOperator):
template_fields: Incomplete
template_fields_renderers: Incomplete
sql_check_template: str
column_checks: Incomplete
table: Incomplete
Expand All @@ -102,6 +103,7 @@ class SQLColumnCheckOperator(BaseSQLOperator):

class SQLTableCheckOperator(BaseSQLOperator):
template_fields: Incomplete
template_fields_renderers: Incomplete
sql_check_template: str
table: Incomplete
checks: Incomplete
Expand Down
134 changes: 134 additions & 0 deletions tests/providers/common/sql/operators/test_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,12 @@ def get_records(*arg):
monkeypatch.setattr(MockHook, "get_records", get_records)
return operator

def _full_check_sql(self, sql: str) -> str:
"""
Wraps the check fragment in the outer parts of the sql query
"""
return f"SELECT col_name, check_type, check_result FROM ({sql}) AS check_columns"

def test_check_not_in_column_checks(self, monkeypatch):
with pytest.raises(AirflowException, match="Invalid column check: invalid_check_name."):
self._construct_operator(monkeypatch, self.invalid_column_mapping, ())
Expand Down Expand Up @@ -246,6 +252,16 @@ def test_generate_sql_query_with_partitions(self, monkeypatch):
== self.correct_generate_sql_query_with_partition.lstrip()
)

def test_generate_sql_query_with_templated_partitions(self, monkeypatch):
checks = self.short_valid_column_mapping["X"]
operator = self._construct_operator(monkeypatch, self.short_valid_column_mapping, ())
operator.partition_clause = "{{ params.col }} > 1"
operator.render_template_fields({"params": {"col": "Y"}})
assert (
operator._generate_sql_query("X", checks).lstrip()
== self.correct_generate_sql_query_with_partition.lstrip()
)

def test_generate_sql_query_with_partitions_and_check_partition(self, monkeypatch):
self.short_valid_column_mapping["X"]["null_check"]["partition_clause"] = "Z < 100"
checks = self.short_valid_column_mapping["X"]
Expand All @@ -267,6 +283,55 @@ def test_generate_sql_query_with_check_partition(self, monkeypatch):
)
del self.short_valid_column_mapping["X"]["distinct_check"]["partition_clause"]

@mock.patch.object(SQLColumnCheckOperator, "get_db_hook")
def test_generated_sql_respects_templated_partitions(self, mock_get_db_hook):
records = [
("X", "null_check", 0),
("X", "distinct_check", 10),
]

mock_hook = mock.Mock()
mock_hook.get_records.return_value = records
mock_get_db_hook.return_value = mock_hook

operator = SQLColumnCheckOperator(
task_id="test_task",
table="test_table",
column_mapping=self.short_valid_column_mapping,
partition_clause="{{ params.col }} > 1",
)
operator.render_template_fields({"params": {"col": "Y"}})

operator.execute(context=MagicMock())

mock_get_db_hook.return_value.get_records.assert_called_once_with(
self._full_check_sql(self.correct_generate_sql_query_with_partition),
)

@mock.patch.object(SQLColumnCheckOperator, "get_db_hook")
def test_generated_sql_respects_templated_table(self, mock_get_db_hook):
records = [
("X", "null_check", 0),
("X", "distinct_check", 10),
]

mock_hook = mock.Mock()
mock_hook.get_records.return_value = records
mock_get_db_hook.return_value = mock_hook

operator = SQLColumnCheckOperator(
task_id="test_task",
table="{{ params.table }}",
column_mapping=self.short_valid_column_mapping,
)
operator.render_template_fields({"params": {"table": "test_table"}})

operator.execute(context=MagicMock())

mock_get_db_hook.return_value.get_records.assert_called_once_with(
self._full_check_sql(self.correct_generate_sql_query_no_partitions),
)


class TestTableCheckOperator:

Expand Down Expand Up @@ -363,6 +428,48 @@ def test_sql_check(self, conn_id):
finally:
hook.run(["DROP TABLE employees"])

@pytest.mark.parametrize(
["conn_id"],
[
pytest.param("postgres_default", marks=[pytest.mark.backend("postgres")]),
pytest.param("mysql_default", marks=[pytest.mark.backend("mysql")]),
],
)
def test_sql_check_partition_clause_templating(self, conn_id):
"""
Checks that the generated sql respects a templated partition clause
"""
operator = SQLTableCheckOperator(
task_id="test_task",
table="employees",
checks={"row_count_check": {"check_statement": "COUNT(*) = 5"}},
conn_id=conn_id,
partition_clause="employment_year = {{ params.year }}",
)

hook = operator.get_db_hook()
hook.run(
[
"""
CREATE TABLE IF NOT EXISTS employees (
employee_name VARCHAR(50) NOT NULL,
employment_year INT NOT NULL
);
""",
"INSERT INTO employees VALUES ('Adam', 2021)",
"INSERT INTO employees VALUES ('Chris', 2021)",
"INSERT INTO employees VALUES ('Frank', 2021)",
"INSERT INTO employees VALUES ('Fritz', 2021)",
"INSERT INTO employees VALUES ('Magda', 2022)",
"INSERT INTO employees VALUES ('Phil', 2021)",
]
)
try:
operator.render_template_fields({"params": {"year": 2021}})
operator.execute({})
finally:
hook.run(["DROP TABLE employees"])

def test_pass_all_checks_check(self, monkeypatch):
records = [("row_count_check", 1), ("column_sum_check", "y")]
operator = self._construct_operator(monkeypatch, self.checks, records)
Expand All @@ -388,6 +495,22 @@ def test_generate_sql_query_with_partitions(self, monkeypatch):
operator._generate_sql_query().lstrip() == self.correct_generate_sql_query_with_partition.lstrip()
)

def test_generate_sql_query_with_templated_partitions(self, monkeypatch):
operator = self._construct_operator(monkeypatch, self.checks, ())
operator.partition_clause = "{{ params.col }} > 10"
operator.render_template_fields({"params": {"col": "col_a"}})
assert (
operator._generate_sql_query().lstrip() == self.correct_generate_sql_query_with_partition.lstrip()
)

def test_generate_sql_query_with_templated_table(self, monkeypatch):
operator = self._construct_operator(monkeypatch, self.checks, ())
operator.table = "{{ params.table }}"
operator.render_template_fields({"params": {"table": "test_table"}})
assert (
operator._generate_sql_query().lstrip() == self.correct_generate_sql_query_no_partitions.lstrip()
)

def test_generate_sql_query_with_partitions_and_check_partition(self, monkeypatch):
self.checks["row_count_check"]["partition_clause"] = "id = 100"
operator = self._construct_operator(monkeypatch, self.checks, ())
Expand Down Expand Up @@ -703,6 +826,17 @@ def test_pass_min_value_max_value(self, mock_get_db_hook):

operator.execute(context=MagicMock())

@mock.patch.object(SQLThresholdCheckOperator, "get_db_hook")
def test_pass_min_value_max_value_templated(self, mock_get_db_hook):
mock_hook = mock.Mock()
mock_hook.get_first.return_value = (10,)
mock_get_db_hook.return_value = mock_hook

operator = self._construct_operator("Select avg(val) from table1 limit 1", "{{ params.min }}", 100)
operator.render_template_fields({"params": {"min": 1}})
operator.execute(context=MagicMock())
mock_hook.get_first.assert_called_once_with("Select avg(val) from table1 limit 1")

@mock.patch.object(SQLThresholdCheckOperator, "get_db_hook")
def test_fail_min_value_max_value(self, mock_get_db_hook):
mock_hook = mock.Mock()
Expand Down