Skip to content

Commit

Permalink
Fix formatting error in operator
Browse files Browse the repository at this point in the history
  • Loading branch information
denimalpaca committed Jul 21, 2022
1 parent 3c300e7 commit 5645b5d
Showing 1 changed file with 32 additions and 33 deletions.
65 changes: 32 additions & 33 deletions airflow/providers/common/sql/operators/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,38 +290,37 @@ def __init__(
# OpenLineage needs a valid SQL query with the input/output table(s) to parse
self.sql = f"SELECT * FROM {self.table};"


def execute(self, context=None):
hook = self.get_db_hook()
checks_sql = " UNION ALL ".join(
[
self.sql_check_template.replace("check_statement", value["check_statement"])
.replace("_check_name", check_name)
.replace("table", self.table)
for check_name, value in self.checks.items()
]
)
batch_statement = f"WHERE {self.batch}" if self.batch else ""
self.sql = f"SELECT check_name, check_result FROM ({checks_sql}) {batch_statement};"
records = hook.get_pandas_df(self.sql)

if records.empty:
raise AirflowException(f"The following query returned zero rows: {self.sql}")

records.columns = records.columns.str.lower()
self.log.info("Record:\n%s", records)

for row in records.iterrows():
check = row[1].get("check_name")
result = row[1].get("check_result")
self.checks[check]["success"] = parse_boolean(str(result))

failed_tests = _get_failed_checks(self.checks)
if failed_tests:
raise AirflowException(
f"Test failed.\nQuery:\n{self.sql}\nResults:\n{records!s}\n"
"The following tests have failed:"
f"\n{', '.join(failed_tests)}"
def execute(self, context=None):
hook = self.get_db_hook()
checks_sql = " UNION ALL ".join(
[
self.sql_check_template.replace("check_statement", value["check_statement"])
.replace("_check_name", check_name)
.replace("table", self.table)
for check_name, value in self.checks.items()
]
)
batch_statement = f"WHERE {self.batch}" if self.batch else ""
self.sql = f"SELECT check_name, check_result FROM ({checks_sql}) {batch_statement};"
records = hook.get_pandas_df(self.sql)

if records.empty:
raise AirflowException(f"The following query returned zero rows: {self.sql}")

records.columns = records.columns.str.lower()
self.log.info("Record:\n%s", records)

for row in records.iterrows():
check = row[1].get("check_name")
result = row[1].get("check_result")
self.checks[check]["success"] = parse_boolean(str(result))

self.log.info("All tests have passed")
failed_tests = _get_failed_checks(self.checks)
if failed_tests:
raise AirflowException(
f"Test failed.\nQuery:\n{self.sql}\nResults:\n{records!s}\n"
"The following tests have failed:"
f"\n{', '.join(failed_tests)}"
)

self.log.info("All tests have passed")

0 comments on commit 5645b5d

Please sign in to comment.