Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

22 coalesce columns into one #30

Merged
merged 8 commits into from
Oct 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions lib/columnutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,13 @@ def fmt_colname(colname: str, warehouse: str):
return colname.lower()
else:
raise ValueError(f"unsupported warehouse: {warehouse}")


def quote_columnname(colname: str, warehouse: str):
"""encloses the column name within the appropriate quotes"""
if warehouse == "postgres":
return '"' + colname + '"'
elif warehouse == "bigquery":
return "`" + colname + "`"
else:
raise ValueError(f"unsupported warehouse: {warehouse}")
3 changes: 3 additions & 0 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@
from operations.mergetables import union_tables
from operations.syncsources import sync_sources
from operations.castdatatypes import cast_datatypes
from operations.coalescecolumns import coalesce_columns

OPERATIONS_DICT = {
"flatten": flatten_operation,
"unionall": union_tables,
"syncsources": sync_sources,
"castdatatypes": cast_datatypes,
"coalescecolumns": coalesce_columns,
}

load_dotenv("dbconnection.env")
Expand Down Expand Up @@ -52,6 +54,7 @@
warehouse = config_data["warehouse"]

# run operations to generate dbt model(s)
# pylint:disable=logging-fstring-interpolation
for op_data in config_data["operations"]:
op_type = op_data["type"]
config = op_data["config"]
Expand Down
18 changes: 18 additions & 0 deletions operations.yaml.template
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,22 @@ operations:
config:
source_name: <top level name of the source in sources.yml file in dbt project. all tables will go under here>
source_schema: <schema of the source mentioned above>
- type: castdatatypes
config:
dest_schema: <destination schema>
input_name: <name of the input model>
output_name: <name of the output model>
columns:
- columnname: <column name>
columntype: <type to cast column to>
- type: coalescecolumns
config:
dest_schema: <destination schema>
input_name: <name of the input model>
output_name: <name of the output model>
columns:
- columnname: <first column>
- columnname: <second column>
- ...
output_column_name: <output column name>

5 changes: 3 additions & 2 deletions operations/castdatatypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from logging import basicConfig, getLogger, INFO

from lib.dbtproject import dbtProject
from lib.columnutils import quote_columnname

basicConfig(level=INFO)
logger = getLogger()
Expand Down Expand Up @@ -37,11 +38,11 @@ def cast_datatypes(config: dict, warehouse: str, project_dir: str):
)
union_code += (
", CAST("
+ column["columnname"]
+ quote_columnname(column["columnname"], warehouse)
+ " AS "
+ warehouse_column_type
+ ") AS "
+ column["columnname"]
+ quote_columnname(column["columnname"], warehouse)
)

union_code += " FROM " + "{{ref('" + input_name + "')}}" + "\n"
Expand Down
44 changes: 44 additions & 0 deletions operations/coalescecolumns.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
"""generates a model which coalesces columns"""

from logging import basicConfig, getLogger, INFO

from lib.dbtproject import dbtProject
from lib.columnutils import quote_columnname

basicConfig(level=INFO)
logger = getLogger()


# pylint:disable=unused-argument,logging-fstring-interpolation
def coalesce_columns(config: dict, warehouse: str, project_dir: str):
"""coalesces columns"""
dest_schema = config["dest_schema"]
output_name = config["output_name"]
input_name = config["input_name"]

dbtproject = dbtProject(project_dir)
dbtproject.ensure_models_dir(dest_schema)

union_code = "{{ config(materialized='table',) }}\n"

columns = config["columns"]
columnnames = [c["columnname"] for c in columns]
union_code += "SELECT {{dbt_utils.star(from=ref('" + input_name + "'), except=["
union_code += ",".join([f'"{columnname}"' for columnname in columnnames])
union_code += "])}}"

union_code += ", COALESCE("

for column in config["columns"]:
union_code += quote_columnname(column["columnname"], warehouse) + ", "
union_code = union_code[:-2] + ") AS " + config["output_column_name"]

union_code += " FROM " + "{{ref('" + input_name + "')}}" + "\n"

logger.info(f"writing dbt model {union_code}")
dbtproject.write_model(
dest_schema,
output_name,
union_code,
)
logger.info(f"dbt model {output_name} successfully created")