Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Where filter operation #81

Merged
merged 6 commits into from
Mar 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 49 additions & 1 deletion dbt_automation/assets/operations.template.yml
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,34 @@ operations:
- <column name>
seq: < its 1 for the above input ; will help in mergeoperation chaininig to figure out if we want to do a left or a right join>


- type: where
config:
input:
input_type: <"source" or "model" of table1>
input_name: <name of source table or ref model table1>
source_name: <name of the source defined in source.yml; will be null for type "model" table1>
source_columns:
- <column name>
- <column name>
- <column name>
dest_schema: <destination schema>
output_name: <name of the output model>
where_type: <"and" or "or" or "sql">
clauses:
- column: <column name>
operator: <"=" or "!=" or "<" or ">" or "<=" or ">=" >
value: <value>
- column: <column name>
operator: <"=" or "!=" or "<" or ">" or "<=" or ">=" >
value: <value>
- column: <column name>
operator: <"=" or "!=" or "<" or ">" or "<=" or ">=" >
value: <value>
- column: <column name>
operator: <"=" or "!=" or "<" or ">" or "<=" or ">=" >
value: <value>
sql_snippet: < custom sql snippet assume its formatted; eg. col1 != 5 >

- type: mergeoperations
config:
dest_schema: <destination_schema>
Expand Down Expand Up @@ -377,3 +404,24 @@ operations:
- <column name>
seq: < its 1 for the above input ; will help in mergeoperation chaininig to figure out if we want to do a left or a right join>

- type: where
config:
source_columns:
- <column name>
- <column name>
- <column name>
where_type: <"and" or "or" or "sql">
clauses:
- column: <column name>
operator: <"=" or "!=" or "<" or ">" or "<=" or ">=" >
value: <value>
- column: <column name>
operator: <"=" or "!=" or "<" or ">" or "<=" or ">=" >
value: <value>
- column: <column name>
operator: <"=" or "!=" or "<" or ">" or "<=" or ">=" >
value: <value>
- column: <column name>
operator: <"=" or "!=" or "<" or ">" or "<=" or ">=" >
value: <value>
sql_snippet: < custom sql snippet assume its formatted; eg. col1 != 5 >
2 changes: 0 additions & 2 deletions dbt_automation/operations/coalescecolumns.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,4 @@ def coalesce_columns(config: dict, warehouse: WarehouseInterface, project_dir: s
dest_schema = config["dest_schema"]
model_sql_path = dbt_project.write_model(dest_schema, output_name, dbt_sql)

logger.info("output model:", model_sql_path)

return model_sql_path, output_cols
6 changes: 5 additions & 1 deletion dbt_automation/operations/mergeoperations.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from dbt_automation.operations.castdatatypes import cast_datatypes_sql
from dbt_automation.operations.replace import replace_dbt_sql
from dbt_automation.operations.joins import joins_sql
from dbt_automation.utils.tableutils import source_or_ref
from dbt_automation.operations.wherefilter import where_filter_sql


def merge_operations_sql(
Expand Down Expand Up @@ -92,6 +92,10 @@ def merge_operations_sql(
)
elif operation["type"] == "join":
op_select_statement, out_cols = joins_sql(operation["config"], warehouse)
elif operation["type"] == "where":
op_select_statement, out_cols = where_filter_sql(
operation["config"], warehouse
)

output_cols = out_cols

Expand Down
134 changes: 134 additions & 0 deletions dbt_automation/operations/wherefilter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
"""generates a model which filters using the sql where clause"""

from logging import basicConfig, getLogger, INFO

from dbt_automation.utils.dbtproject import dbtProject
from dbt_automation.utils.columnutils import quote_columnname, quote_constvalue
from dbt_automation.utils.interfaces.warehouse_interface import WarehouseInterface
from dbt_automation.utils.tableutils import source_or_ref

basicConfig(level=INFO)
logger = getLogger()

# sql, columns = wherefilter.where_filter_sql({
# "source_columns": ["salinity", "Arsenic"],
# "input": {
# "input_type": "source",
# "source_name": "destinations_v2",
# "input_name": "Sheet2",
# },
# "where_type": "and",
# "clauses": [
# {
# "column": "Iron",
# "operator": ">=",
# "value": "0"
# },
# {
# "column": "Nitrate",
# "operator": "<>",
# "value": "50"
# }
# ]
# }, wc_client)
#
# SELECT
# `salinity`,
# `Arsenic`
# FROM {{source('destinations_v2', 'Sheet2')}}
# WHERE (`Iron` >= '0' AND `Nitrate` <> '50' )
#
# sql, columns = wherefilter.where_filter_sql({
# "source_columns": ["salinity", "Arsenic"],
# "input": {
# "input_type": "source",
# "source_name": "destinations_v2",
# "input_name": "Sheet2",
# },
# "where_type": "sql",
# "sql_snippet": "CAST(`Iron`, INT64) > CAST(`Nitrate`, INT64)"
# }, wc_client)
#
# SELECT
# `salinity`,
# `Arsenic`
# FROM {{source('destinations_v2', 'Sheet2')}}
# WHERE (CAST(`Iron`, INT64) > CAST(`Nitrate`, INT64))


# pylint:disable=unused-argument,logging-fstring-interpolation
def where_filter_sql(
config: dict,
warehouse: WarehouseInterface,
):
"""
Generate SQL code for the coalesce_columns operation.
"""
source_columns = config["source_columns"]
input_table = config["input"]
clauses: dict = config.get("clauses", [])
clause_type: str = config.get("where_type", "") # and, or, sql
sql_snippet: str = config.get("sql_snippet", "")

dbt_code = "SELECT\n"

select_from = source_or_ref(**input_table)

dbt_code += ",\n".join(
[quote_columnname(col_name, warehouse.name) for col_name in source_columns]
)
dbt_code += "\n"

select_from = source_or_ref(**input_table)
if input_table["input_type"] == "cte":
dbt_code += f"FROM {select_from}\n"
else:
dbt_code += f"FROM {{{{{select_from}}}}}\n"

# where
if len(clauses) == 0 and not sql_snippet:
raise ValueError("No where clause provided")

dbt_code += "WHERE ("

if clause_type in ["and", "or"]:
temp = []
for clause in clauses:
clause = (
f"{quote_columnname(clause['column'], warehouse.name)} "
+ f"{clause['operator']} "
+ f"{quote_constvalue(str(clause['value']), warehouse.name)} "
)
temp.append(clause)

dbt_code += f" {clause_type.upper()} ".join(temp)

elif clause_type == "sql":
dbt_code += f"{sql_snippet}"

dbt_code += ")"

return dbt_code, source_columns


def where_filter(config: dict, warehouse: WarehouseInterface, project_dir: str):
"""
Perform coalescing of columns and generate a DBT model.
"""
dbt_sql = ""
if config["input"]["input_type"] != "cte":
dbt_sql = (
"{{ config(materialized='table', schema='" + config["dest_schema"] + "') }}"
)

select_statement, output_cols = where_filter_sql(config, warehouse)
dbt_sql += "\n" + select_statement

dbt_project = dbtProject(project_dir)
dbt_project.ensure_models_dir(config["dest_schema"])

output_name = config["output_name"]
dest_schema = config["dest_schema"]
model_sql_path = dbt_project.write_model(dest_schema, output_name, dbt_sql)

return model_sql_path, output_cols
2 changes: 2 additions & 0 deletions scripts/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from dbt_automation.operations.regexextraction import regex_extraction
from dbt_automation.operations.replace import replace
from dbt_automation.operations.joins import join
from dbt_automation.operations.wherefilter import where_filter

OPERATIONS_DICT = {
"flatten": flatten_operation,
Expand All @@ -40,6 +41,7 @@
"mergeoperations": merge_operations,
"replace": replace,
"join": join,
"where": where_filter,
}

load_dotenv("./../dbconnection.env")
Expand Down
Loading