diff --git a/airflow/providers/common/sql/operators/sql.py b/airflow/providers/common/sql/operators/sql.py index 411f540f76808..4b93320d81084 100644 --- a/airflow/providers/common/sql/operators/sql.py +++ b/airflow/providers/common/sql/operators/sql.py @@ -290,38 +290,37 @@ def __init__( # OpenLineage needs a valid SQL query with the input/output table(s) to parse self.sql = f"SELECT * FROM {self.table};" - -def execute(self, context=None): - hook = self.get_db_hook() - checks_sql = " UNION ALL ".join( - [ - self.sql_check_template.replace("check_statement", value["check_statement"]) - .replace("_check_name", check_name) - .replace("table", self.table) - for check_name, value in self.checks.items() - ] - ) - batch_statement = f"WHERE {self.batch}" if self.batch else "" - self.sql = f"SELECT check_name, check_result FROM ({checks_sql}) {batch_statement};" - records = hook.get_pandas_df(self.sql) - - if records.empty: - raise AirflowException(f"The following query returned zero rows: {self.sql}") - - records.columns = records.columns.str.lower() - self.log.info("Record:\n%s", records) - - for row in records.iterrows(): - check = row[1].get("check_name") - result = row[1].get("check_result") - self.checks[check]["success"] = parse_boolean(str(result)) - - failed_tests = _get_failed_checks(self.checks) - if failed_tests: - raise AirflowException( - f"Test failed.\nQuery:\n{self.sql}\nResults:\n{records!s}\n" - "The following tests have failed:" - f"\n{', '.join(failed_tests)}" + def execute(self, context=None): + hook = self.get_db_hook() + checks_sql = " UNION ALL ".join( + [ + self.sql_check_template.replace("check_statement", value["check_statement"]) + .replace("_check_name", check_name) + .replace("table", self.table) + for check_name, value in self.checks.items() + ] ) + batch_statement = f"WHERE {self.batch}" if self.batch else "" + self.sql = f"SELECT check_name, check_result FROM ({checks_sql}) {batch_statement};" + records = hook.get_pandas_df(self.sql) + + if records.empty: + raise AirflowException(f"The following query returned zero rows: {self.sql}") + + records.columns = records.columns.str.lower() + self.log.info("Record:\n%s", records) + + for row in records.iterrows(): + check = row[1].get("check_name") + result = row[1].get("check_result") + self.checks[check]["success"] = parse_boolean(str(result)) - self.log.info("All tests have passed") + failed_tests = _get_failed_checks(self.checks) + if failed_tests: + raise AirflowException( + f"Test failed.\nQuery:\n{self.sql}\nResults:\n{records!s}\n" + "The following tests have failed:" + f"\n{', '.join(failed_tests)}" + ) + + self.log.info("All tests have passed")