Skip to content

Commit

Permalink
fix: validations custom query for mssql, mysql & DB2 (#268)
Browse files Browse the repository at this point in the history
* fix: numeric validation custom query for mssql,mysql & DB2

* fix: Reliability, Uniqueness and Completeness validation for mssql, mysql & db2

* fix: some validation matrices

* fix: ssn metrices

* fix: Contact info metrices

* fix: Contact info metrices

* fix: sedol

* fix: validity metrices

* typo: validty doc

* doc: update examples

* fix: mysql ssl connectivity

* fix: mysql ssl security
  • Loading branch information
Ryuk-me authored Nov 15, 2024
1 parent 52029ca commit 6ecd77c
Show file tree
Hide file tree
Showing 12 changed files with 1,620 additions and 960 deletions.
20 changes: 10 additions & 10 deletions dcs_core/core/common/models/validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,13 @@ class ValidationFunction(str, Enum):
MAX = "max"
AVG = "avg"
SUM = "sum"
MEDIAN = "median"
MEDIAN = "median" # todo not implemented
STDDEV = "stddev"
VARIANCE = "variance"
COUNT_FALSE = "count_false"
PERCENT_FALSE = "percent_false"
COUNT_TRUE = "count_true"
PERCENT_TRUE = "percent_true"
COUNT_FALSE = "count_false" # todo not implemented
PERCENT_FALSE = "percent_false" # todo not implemented
COUNT_TRUE = "count_true" # todo not implemented
PERCENT_TRUE = "percent_true" # todo not implemented
PERCENTILE_20 = "percentile_20"
PERCENTILE_40 = "percentile_40"
PERCENTILE_60 = "percentile_60"
Expand All @@ -96,17 +96,17 @@ class ValidationFunction(str, Enum):

# Completeness validations 8
COUNT_NULL = "count_null"
COUNT_NOT_NULL = "count_not_null"
COUNT_NOT_NULL = "count_not_null" # todo not implemented
PERCENT_NULL = "percent_null"
PERCENT_NOT_NULL = "percent_not_null"
PERCENT_NOT_NULL = "percent_not_null" # todo not implemented
COUNT_EMPTY_STRING = "count_empty_string"
PERCENT_EMPTY_STRING = "percent_empty_string"
COUNT_NAN = "count_nan"
PERCENT_NAN = "percent_nan"
COUNT_NAN = "count_nan" # todo not implemented
PERCENT_NAN = "percent_nan" # todo not implemented
COUNT_ALL_SPACE = "count_all_space"
PERCENT_ALL_SPACE = "percent_all_space"
COUNT_NULL_KEYWORD = "count_null_keyword"
PERCENT_NULL_KEYWORD = "percent_null_keyboard"
PERCENT_NULL_KEYWORD = "percent_null_keyword"

# Custom SQL
CUSTOM_SQL = "custom_sql"
Expand Down
2 changes: 1 addition & 1 deletion dcs_core/core/datasource/sql_datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -714,7 +714,7 @@ def query_get_null_keyword_count(
"""
qualified_table_name = self.qualified_table_name(table)

query = f""" SELECT SUM(CASE WHEN LOWER({field}) IN ('nothing', 'nil', 'null', 'none', 'n/a') THEN 1 ELSE 0 END) AS null_count,COUNT(*) AS total_count
query = f""" SELECT SUM(CASE WHEN LOWER({field}) IN ('nothing', 'nil', 'null', 'none', 'n/a', null) THEN 1 ELSE 0 END) AS null_count,COUNT(*) AS total_count
FROM {qualified_table_name}"""

if filters:
Expand Down
179 changes: 171 additions & 8 deletions dcs_core/integrations/databases/db2.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,133 @@ def _build_connection_url(self) -> str:

return url

def query_get_distinct_count(
self, table: str, field: str, filters: str = None
) -> int:
"""
Get the distinct count value
:param table: table name
:param field: column name
:param filters: filter condition
:return: distinct count as an integer
"""
qualified_table_name = self.qualified_table_name(table)
query = f"SELECT COUNT(DISTINCT CAST({field} AS VARCHAR(255))) FROM {qualified_table_name}"

if filters:
query += f" WHERE {filters}"

result = self.fetchone(query)
return result[0] if result else 0

def query_negative_metric(
self, table: str, field: str, operation: str, filters: str = None
) -> Union[int, float]:
"""
Calculate a negative metric for a specified field in a Db2 table.
:param table: table name
:param field: column name
:param operation: type of operation, "percent" or "count"
:param filters: optional filter conditions
:return: percentage of negative values if operation is "percent", otherwise count of negatives
"""
qualified_table_name = self.qualified_table_name(table)

negative_query = (
f"SELECT COUNT(*) FROM {qualified_table_name} WHERE {field} < 0"
)
if filters:
negative_query += f" AND {filters}"

total_count_query = f"SELECT COUNT(*) FROM {qualified_table_name}"
if filters:
total_count_query += f" WHERE {filters}"

if operation == "percent":
query = f"""
SELECT (CAST(({negative_query}) AS FLOAT) / NULLIF(CAST(({total_count_query}) AS FLOAT), 0)) * 100
FROM SYSIBM.SYSDUMMY1
"""
else:
query = negative_query

result = self.fetchone(query)[0]
return round(result, 2) if operation == "percent" else result

def query_get_null_keyword_count(
self, table: str, field: str, operation: str, filters: str = None
) -> Union[int, float]:
"""
Get the count of NULL-like values (specific keywords) in the specified column for IBM DB2.
:param table: table name
:param field: column name
:param operation: type of operation ('count' or 'percent')
:param filters: filter condition
:return: count or percentage of NULL-like keyword values
"""
qualified_table_name = self.qualified_table_name(table)

query = f"""
SELECT
SUM(CASE
WHEN {field} IS NULL
OR TRIM(UPPER({field})) IN ('NOTHING', 'NIL', 'NULL', 'NONE', 'N/A')
THEN 1
ELSE 0
END) AS null_count,
COUNT(*) AS total_count
FROM {qualified_table_name}
"""

if filters:
query += f" WHERE {filters}"

result = self.fetchone(query)

if not result or result[1] == 0:
return 0

if operation == "percent":
return round((result[0] or 0) / result[1] * 100, 2)

return result[0] or 0

def query_get_string_length_metric(
self, table: str, field: str, metric: str, filters: str = None
) -> Union[int, float]:
"""
Get the string length metric (max, min, avg) in a column of a table.
:param table: table name
:param field: column name
:param metric: the metric to calculate ('max', 'min', 'avg')
:param filters: filter condition
:return: the calculated metric as int for 'max' and 'min', float for 'avg'
"""
qualified_table_name = self.qualified_table_name(table)

if metric.lower() == "max":
sql_function = "MAX(LENGTH"
elif metric.lower() == "min":
sql_function = "MIN(LENGTH"
elif metric.lower() == "avg":
sql_function = "AVG(CAST(LENGTH"
else:
raise ValueError(
f"Invalid metric '{metric}'. Choose from 'max', 'min', or 'avg'."
)

if metric.lower() == "avg":
query = f'SELECT {sql_function}("{field}") AS FLOAT)) FROM {qualified_table_name}'
else:
query = f'SELECT {sql_function}("{field}")) FROM {qualified_table_name}'

if filters:
query += f" WHERE {filters}"

result = self.fetchone(query)[0]
return round(result, 2) if metric.lower() == "avg" else result

def query_string_pattern_validity(
self,
table: str,
Expand Down Expand Up @@ -97,13 +224,13 @@ def query_string_pattern_validity(
else:
regex = regex_pattern

regex_query = f"CASE WHEN REGEXP_LIKE({field}, '{regex}') THEN 1 ELSE 0 END"
regex_query = f"""
CASE WHEN REGEXP_LIKE("{field}", '{regex}') THEN 1 ELSE 0 END"""

query = f"""
SELECT SUM({regex_query}) AS valid_count, COUNT(*) AS total_count
FROM {qualified_table_name} {filters}
"""

result = self.fetchone(query)
return result[0], result[1]

Expand Down Expand Up @@ -146,10 +273,7 @@ def query_valid_invalid_values_validity(
return result[0], result[1]

def query_get_usa_state_code_validity(
self,
table: str,
field: str,
filters: str = None,
self, table: str, field: str, filters: str = None
) -> Tuple[int, int]:
"""
Get the count of valid USA state codes
Expand All @@ -167,13 +291,14 @@ def query_get_usa_state_code_validity(

qualified_table_name = self.qualified_table_name(table)

regex_query = f"CASE WHEN REGEXP_LIKE{field}, '^[A-Z]{{2}}$' AND {field} IN ({valid_state_codes_str}) THEN 1 ELSE 0 END"
regex_query = f"""
CASE WHEN REGEXP_LIKE("{field}", '^[A-Z]{{2}}$') AND UPPER("{field}") IN ({valid_state_codes_str}) THEN 1 ELSE 0 END
"""

query = f"""
SELECT SUM({regex_query}) AS valid_count, COUNT(*) AS total_count
FROM {qualified_table_name} {filters}
"""

result = self.fetchone(query)
return result[0], result[1]

Expand Down Expand Up @@ -461,3 +586,41 @@ def query_timestamp_date_not_in_future_metric(
except Exception as e:
logger.error(f"Failed to execute query: {str(e)}")
return 0, 0

def query_geolocation_metric(
self, table: str, field: str, operation: str, filters: str = None
) -> Union[int, float]:
qualified_table_name = self.qualified_table_name(table)

valid_query = f'SELECT COUNT("{field}") FROM {qualified_table_name} WHERE "{field}" IS NOT NULL AND "{field}"'

if field.lower().startswith("lat"):
valid_query += "BETWEEN -90 AND 90"
elif field.lower().startswith("lon"):
valid_query += "BETWEEN -180 AND 180"

if filters:
valid_query += f" AND {filters}"

valid_count = self.fetchone(valid_query)[0]

if operation == "percent":
total_query = f"SELECT COUNT(*) FROM {qualified_table_name}"
if filters:
total_query += f" WHERE {filters}"

total_count = self.fetchone(total_query)[0]

result = (valid_count / total_count) * 100 if total_count > 0 else 0
return round(result, 2)

return valid_count

def query_timestamp_metric(self):
raise NotImplementedError("Method not implemented for DB2DataSource")

def query_timestamp_not_in_future_metric(self):
raise NotImplementedError("Method not implemented for DB2DataSource")

def query_timestamp_date_not_in_future_metric(self):
raise NotImplementedError("Method not implemented for DB2DataSource")
Loading

0 comments on commit 6ecd77c

Please sign in to comment.