Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

58 combining operations using cte #62

Merged
merged 42 commits into from
Feb 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
1d3f84d
refactor arithmetic operation
Abhishek-N Feb 16, 2024
a5b479c
castdatatypes for merge operations
Abhishek-N Feb 16, 2024
eaddb2b
coalescecolumns to merge operations
Abhishek-N Feb 16, 2024
dd58f50
concatcolumns to refactor for merge operations
Abhishek-N Feb 16, 2024
0800926
drop columns and rename columns refactor
Abhishek-N Feb 16, 2024
1c2650a
flatten json refactor for merging
Abhishek-N Feb 16, 2024
340a497
merge operations to perform cte and add operations in a single dbt file
Abhishek-N Feb 16, 2024
8d614af
refactor regex_extraction to perform merge
Abhishek-N Feb 16, 2024
00236ae
include merge in main
Abhishek-N Feb 16, 2024
4473840
cleanup wip
Abhishek-N Feb 16, 2024
bbc68b0
further refactor
Abhishek-N Feb 16, 2024
1d0ebf2
add cte condition to each to show config header
Abhishek-N Feb 16, 2024
959eac6
update yaml template
Abhishek-N Feb 16, 2024
f808ef1
add cte to source or ref
Abhishek-N Feb 16, 2024
f30a4a4
fix merge function to handle different types
Abhishek-N Feb 16, 2024
993941c
update source or ref for cte
Abhishek-N Feb 16, 2024
254959a
add config header
Abhishek-N Feb 16, 2024
41af507
refactor to show with in first query
Abhishek-N Feb 16, 2024
b3ac932
fix typo
Feb 17, 2024
d51c2f3
remove unused import
Feb 17, 2024
3e016ee
missing "select"
Feb 17, 2024
bd8504a
Merge branch 'main' into 58-combining-operations-using-cte
Feb 17, 2024
719e8e8
add source schema to the first cte
Abhishek-N Feb 18, 2024
8ab51d9
Merge branch 'main' into 58-combining-operations-using-cte
Abhishek-N Feb 18, 2024
6c7aa4c
Resolved merge conflicts
Abhishek-N Feb 18, 2024
0d756dd
config update on arithmetic
Abhishek-N Feb 20, 2024
dca1b5e
remove dbt star and update castdatatypes
Abhishek-N Feb 20, 2024
8381c1d
remove dbt_utils.star and update coalesce
Abhishek-N Feb 20, 2024
c4b1924
fix concat columns to work with merge
Abhishek-N Feb 20, 2024
928e599
remove dbt_utils.star for drop and rename column
Abhishek-N Feb 20, 2024
de82806
fix overall merge
Abhishek-N Feb 20, 2024
19f2467
update regex extraction
Abhishek-N Feb 20, 2024
e5dca20
operations.yaml.template update
Abhishek-N Feb 20, 2024
7c1df1e
flatten json with source columns
Abhishek-N Feb 20, 2024
dc37e9e
refactor on configs
Abhishek-N Feb 20, 2024
44e64ac
fixes for existing tests
Abhishek-N Feb 21, 2024
3fd3712
tests for merge
Abhishek-N Feb 23, 2024
4fab83b
concat fix and update tests
Abhishek-N Feb 23, 2024
4a94dc7
updates file structure for pytest
Abhishek-N Feb 24, 2024
93c4d65
pylint
Feb 24, 2024
e8bc0e5
don't pass the config to the _sql functions
Feb 24, 2024
c0d6350
Delete compare-dbt-outputs.md
fatchat Feb 24, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
186 changes: 186 additions & 0 deletions dbt_automation/assets/operations.template.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ operations:
source_name: <name of the source defined in source.yml; will be null for type "model">
dest_schema: <destination schema>
output_name: <name of the output model>
source_columns:
- <column name>
- <column name>
- <column name>
- ...
columns_to_copy:
- column1
- column2
Expand All @@ -52,6 +57,11 @@ operations:
source_name: <name of the source defined in source.yml; will be null for type "model">
dest_schema: <destination schema>
output_name: <name of the output model>
source_columns:
- <column name>
- <column name>
- <column name>
- ...
columns:
- columnname: <column name>
columntype: <type to cast column to>
Expand All @@ -63,6 +73,11 @@ operations:
source_name: <name of the source defined in source.yml; will be null for type "model">
dest_schema: <destination schema>
output_name: <name of the output model>
source_columns:
- <column name>
- <column name>
- <column name>
- ...
columns:
- columnname: <first column>
- columnname: <second column>
Expand All @@ -76,6 +91,11 @@ operations:
source_name: <name of the source defined in source.yml; will be null for type "model">
dest_schema: <destination schema>
output_name: <name of the output model>
source_columns:
- <column name>
- <column name>
- <column name>
- ...
columns:
- name: <string (column or const)>
is_col: <yes or no>
Expand All @@ -94,6 +114,11 @@ operations:
output_name: <enter the name of the output model>
dest_schema: <enter your destination/output schema>
operator: <can be "add", "sub", "mul", "div">
source_columns:
- <column name>
- <column name>
- <column name>
- ...
operands:
- <name of the 1st column>
- <name of the 2nd column or a const value>
Expand All @@ -107,6 +132,11 @@ operations:
source_name: <name of the source defined in source.yml; will be null for type "model">
dest_schema: <destination schema>
output_name: <name of the output model>
source_columns:
- <column name>
- <column name>
- <column name>
- ...
columns:
- <column name>
- type: renamecolumns
Expand All @@ -117,6 +147,11 @@ operations:
source_name: <name of the source defined in source.yml; will be null for type "model">
dest_schema: <destination schema>
output_name: <name of the output model>
source_columns:
- <column name>
- <column name>
- <column name>
- ...
columns:
<old column name>: <new column name>
- type: regexextraction
Expand All @@ -127,6 +162,157 @@ operations:
source_name: <name of the source defined in source.yml; will be null for type "model">
dest_schema: <destination schema>
output_name: <name of the output model>
source_columns:
- <column name>
- <column name>
- <column name>
- ...
columns:
column1: <regex operation>
column2: <regex operation>

- type: mergeoperations
config:
dest_schema: <destination_schema>
output_name: mergeoperation
source_name: <name of the source defined in source.yml; will be null for type "model">
operations:
- type: castdatatypes
config:
input:
input_type: <source_type>
input_name: <source_name>
source_name: <source_name>
dest_schema: <intermediate_schema>
output_name: cast_output
source_columns:
- <column name>
- <column name>
- <column name>
- ...
columns:
- columnname: <column_name>
columntype: <column_type>
- type: arithmetic
config:
input:
input_type: <source_type>
input_name: <source_name>
source_name: <source_name>
output_name: arithmetic_output
dest_schema: <intermediate_schema>
operator: <operator>
source_columns:
- <column name>
- <column name>
- <column name>
- ...
operands:
- <operand1>
- <operand2>
- <operand3>
output_column_name: <output_column_name>
- type: coalescecolumns
config:
input:
input_type: <source_type>
input_name: <source_name>
source_name: <source_name>
dest_schema: <intermediate_schema>
output_name: coalesce_columns
source_columns:
- <column name>
- <column name>
- <column name>
- ...
columns:
- columnname: <column_name>
- columnname: <column_name>
output_column_name: <output_column_name>
- type: concat
config:
input:
input_type: <source_type>
input_name: <source_name>
source_name: <source_name>
dest_schema: <intermediate_schema>
output_name: concat
source_columns:
- <column name>
- <column name>
- <column name>
- ...
columns:
- name: <column_name>
is_col: <yes_or_no>
- name: <column_name>
is_col: <yes_or_no>
- name: <column_name>
is_col: <yes_or_no>
output_column_name: <output_column_name>
- type: dropcolumns
config:
input:
input_type: <source_type>
input_name: <source_name>
source_name: <source_name>
dest_schema: <intermediate_schema>
output_name: drop_column
source_columns:
- <column name>
- <column name>
- <column name>
- ...
columns:
- <column_name>
- type: renamecolumns
config:
input:
input_type: <source_type>
input_name: <source_name>
source_name: <source_name>
dest_schema: <intermediate_schema>
output_name: rename_column
source_columns:
- <column name>
- <column name>
- <column name>
- ...
columns:
<old_column_name>: <new_column_name>
- type: flattenjson
config:
input:
input_type: <source_type>
input_name: <source_name>
source_name: <source_name>
dest_schema: <intermediate_schema>
output_name: flatten_json
source_columns:
- <column name>
- <column name>
- <column name>
- ...
columns_to_copy:
- <column_name>
- <column_name>
json_column: <json_column>
json_columns_to_copy:
- <json_column_name>
- <json_column_name>
- type: regexextraction
config:
input:
input_type: <source_type>
input_name: <source_name>
source_name: <source_name>
dest_schema: <intermediate_schema>
output_name: regex_extraction
source_columns:
- <column name>
- <column name>
- <column name>
- ...
columns:
<column_name>: <regex_pattern>
<column_name>: <regex_pattern>
63 changes: 43 additions & 20 deletions dbt_automation/operations/arithmetic.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,41 +5,42 @@
from logging import basicConfig, getLogger, INFO
from dbt_automation.utils.dbtproject import dbtProject
from dbt_automation.utils.interfaces.warehouse_interface import WarehouseInterface
from dbt_automation.utils.columnutils import quote_columnname

from dbt_automation.utils.tableutils import source_or_ref

basicConfig(level=INFO)
logger = getLogger()


# pylint:disable=unused-argument,logging-fstring-interpolation
def arithmetic(config: dict, warehouse: WarehouseInterface, project_dir: str):
def arithmetic_dbt_sql(config: dict, warehouse: WarehouseInterface):
"""
performs arithmetic operations: +/-/*//
config["input"] is dict {"source_name": "", "input_name": "", "input_type": ""}
"""
output_name = config["output_name"]
dest_schema = config["dest_schema"]
operator = config["operator"]
operands = config["operands"]
output_col_name = config["output_column_name"]
source_columns = config.get("source_columns", [])

if operator not in ["add", "sub", "mul", "div"]:
raise ValueError("unknown operation")
raise ValueError("Unknown operation")

if len(operands) <= 1:
raise ValueError("need atleast two operands to perform operations")
if len(operands) < 2:
raise ValueError("At least two operands are required to perform operations")

if operator == "div" and (len(operands) != 2):
raise ValueError("division needs exactly two operands")
if operator == "div" and len(operands) != 2:
raise ValueError("Division requires exactly two operands")

# setup the dbt project
dbtproject = dbtProject(project_dir)
dbtproject.ensure_models_dir(dest_schema)
dbt_code = "SELECT "

dbt_code = "{{ config(materialized='table',schema='" + dest_schema + "') }}\n"
dbt_code += ", ".join(
[quote_columnname(col, warehouse.name) for col in source_columns]
)

if operator == "add":
dbt_code += "SELECT *,"
dbt_code += ","
dbt_code += "{{dbt_utils.safe_add(["
for operand in operands:
dbt_code += f"'{str(operand)}',"
Expand All @@ -48,14 +49,14 @@ def arithmetic(config: dict, warehouse: WarehouseInterface, project_dir: str):
dbt_code += f" AS {output_col_name} "

if operator == "mul":
dbt_code += "SELECT *,"
dbt_code += ","
for operand in operands:
dbt_code += f"{operand} * "
dbt_code = dbt_code[:-2]
dbt_code += f" AS {output_col_name} "

if operator == "sub":
dbt_code += "SELECT *,"
dbt_code += ","
dbt_code += "{{dbt_utils.safe_subtract(["
for operand in operands:
dbt_code += f"'{str(operand)}',"
Expand All @@ -64,17 +65,39 @@ def arithmetic(config: dict, warehouse: WarehouseInterface, project_dir: str):
dbt_code += f" AS {output_col_name} "

if operator == "div":
dbt_code += "SELECT *,"
dbt_code += ","
dbt_code += "{{dbt_utils.safe_divide("
for operand in operands:
dbt_code += f"'{str(operand)}',"
dbt_code += ")}}"
dbt_code += f" AS {output_col_name} "

dbt_code += " FROM " + "{{" + source_or_ref(**config["input"]) + "}}" + "\n"
select_from = source_or_ref(**config["input"])
if config["input"]["input_type"] == "cte":
dbt_code += "\n FROM " + select_from + "\n"
else:
dbt_code += "\n FROM " + "{{" + select_from + "}}" + "\n"

return dbt_code


def arithmetic(config: dict, warehouse: WarehouseInterface, project_dir: str):
"""
Perform arithmetic operations and generate a DBT model.
"""
config_sql = ""
if config["input"]["input_type"] != "cte":
config_sql = (
"{{ config(materialized='table', schema='" + config["dest_schema"] + "') }}"
)

sql = config_sql + "\n" + arithmetic_dbt_sql(config, warehouse)

dbtproject = dbtProject(project_dir)
dbtproject.ensure_models_dir(config["dest_schema"])

logger.info(f"writing dbt model {dbt_code}")
model_sql_path = dbtproject.write_model(dest_schema, output_name, dbt_code)
logger.info(f"dbt model {output_name} successfully created")
model_sql_path = dbtproject.write_model(
config["dest_schema"], config["output_name"], sql
)

return model_sql_path
Loading
Loading