Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update operations to return model info #54

Merged
merged 3 commits into from
Feb 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file.
4 changes: 3 additions & 1 deletion dbt_automation/operations/arithmetic.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,5 +72,7 @@ def arithmetic(config: dict, warehouse: WarehouseInterface, project_dir: str):
dbt_code += " FROM " + "{{ref('" + input_model + "')}}" + "\n"

logger.info(f"writing dbt model {dbt_code}")
dbtproject.write_model(dest_schema, output_name, dbt_code)
model_sql_path = dbtproject.write_model(dest_schema, output_name, dbt_code)
logger.info(f"dbt model {output_name} successfully created")

return model_sql_path
4 changes: 3 additions & 1 deletion dbt_automation/operations/castdatatypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,11 @@ def cast_datatypes(config: dict, warehouse: WarehouseInterface, project_dir: str
union_code += " FROM " + "{{ref('" + input_name + "')}}" + "\n"

logger.info(f"writing dbt model {union_code}")
dbtproject.write_model(
model_sql_path = dbtproject.write_model(
dest_schema,
output_name,
union_code,
)
logger.info(f"dbt model {output_name} successfully created")

return model_sql_path
4 changes: 3 additions & 1 deletion dbt_automation/operations/coalescecolumns.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,11 @@ def coalesce_columns(config: dict, warehouse: WarehouseInterface, project_dir: s
union_code += " FROM " + "{{ref('" + input_name + "')}}" + "\n"

logger.info(f"writing dbt model {union_code}")
dbtproject.write_model(
model_sql_path = dbtproject.write_model(
dest_schema,
output_name,
union_code,
)
logger.info(f"dbt model {output_name} successfully created")

return model_sql_path
11 changes: 7 additions & 4 deletions dbt_automation/operations/concatcolumns.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,16 @@ def concat_columns(config: dict, warehouse: WarehouseInterface, project_dir: str
dbt_code = "{{ config(materialized='table', schema='" + dest_schema + "') }}\n"
concat_fields = ",".join(
[
quote_columnname(col["name"], warehouse.name)
if col["is_col"] in ["yes", True, "y"]
else f"'{col['name']}'"
(
quote_columnname(col["name"], warehouse.name)
if col["is_col"] in ["yes", True, "y"]
else f"'{col['name']}'"
)
for col in columns
]
)
dbt_code += f"SELECT *, CONCAT({concat_fields}) AS {output_column_name}"
dbt_code += " FROM " + "{{ref('" + input_name + "')}}" + "\n"

dbtproject.write_model(dest_schema, output_name, dbt_code)
model_sql_path = dbtproject.write_model(dest_schema, output_name, dbt_code)
return model_sql_path
7 changes: 5 additions & 2 deletions dbt_automation/operations/droprenamecolumns.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""drop and rename columns"""

from logging import basicConfig, getLogger, INFO

from dbt_automation.utils.dbtproject import dbtProject
Expand All @@ -24,7 +25,8 @@ def drop_columns(config: dict, warehouse: WarehouseInterface, project_dir: str):

dbtproject = dbtProject(project_dir)
dbtproject.ensure_models_dir(dest_schema)
dbtproject.write_model(dest_schema, output_model_name, model_code)
model_sql_path = dbtproject.write_model(dest_schema, output_model_name, model_code)
return model_sql_path


def rename_columns(config: dict, warehouse, project_dir: str):
Expand All @@ -48,4 +50,5 @@ def rename_columns(config: dict, warehouse, project_dir: str):
model_code = model_code[:-2] # Remove trailing comma and space
model_code += f'\nFROM \n {{{{ ref("{input_name}") }}}}'

dbtproject.write_model(dest_schema, output_model_name, model_code)
model_sql_path = dbtproject.write_model(dest_schema, output_model_name, model_code)
return model_sql_path
4 changes: 3 additions & 1 deletion dbt_automation/operations/flattenairbyte.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""generates models to flatten airbyte raw data"""

import sys
from logging import basicConfig, getLogger, INFO

Expand Down Expand Up @@ -79,7 +80,8 @@ def flatten_operation(config: dict, warehouse: WarehouseInterface, project_dir:
logger.info(f"completed flattening {tablename}")

# finally write the yml with the models configuration
dbtproject.write_model_config(DEST_SCHEMA, models, logger=logger)
models_yml_path = dbtproject.write_model_config(DEST_SCHEMA, models, logger=logger)
return models_yml_path


# ================================================================================
Expand Down
4 changes: 3 additions & 1 deletion dbt_automation/operations/flattenjson.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""pull fields out of a json field into their own columns"""

from logging import basicConfig, getLogger, INFO

from dbt_automation.utils.dbtproject import dbtProject
Expand Down Expand Up @@ -54,4 +55,5 @@ def flattenjson(config: dict, warehouse: WarehouseInterface, project_dir: str):

dbtproject = dbtProject(project_dir)
dbtproject.ensure_models_dir(dest_schema)
dbtproject.write_model(dest_schema, output_name, model_code)
model_sql_path = dbtproject.write_model(dest_schema, output_name, model_code)
return model_sql_path
5 changes: 4 additions & 1 deletion dbt_automation/operations/mergetables.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""takes a list of tables and a common column spec and creates a dbt model to merge them"""

import os

import argparse
Expand Down Expand Up @@ -50,13 +51,15 @@ def union_tables(config, warehouse: WarehouseInterface, project_dir):
union_code += ")}}"

logger.info(f"writing dbt model {union_code}")
dbtproject.write_model(
model_sql_path = dbtproject.write_model(
dest_schema,
output_model_name,
union_code,
)
logger.info(f"dbt model {output_model_name} successfully created")

return model_sql_path


if __name__ == "__main__":
# pylint:disable=invalid-name
Expand Down
4 changes: 3 additions & 1 deletion dbt_automation/operations/regexextraction.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""extract from a regex"""

from dbt_automation.utils.columnutils import quote_columnname
from dbt_automation.utils.dbtproject import dbtProject
from dbt_automation.utils.interfaces.warehouse_interface import WarehouseInterface
Expand Down Expand Up @@ -36,4 +37,5 @@ def regex_extraction(config: dict, warehouse: WarehouseInterface, project_dir: s

model_code += f'\nFROM \n {{{{ ref("{input_name}") }}}}'

dbtproject.write_model(dest_schema, output_model_name, model_code)
model_sql_path = dbtproject.write_model(dest_schema, output_model_name, model_code)
return model_sql_path
7 changes: 6 additions & 1 deletion dbt_automation/operations/scaffold.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""setup the dbt project"""

import os, shutil, yaml
from pathlib import Path
from string import Template
Expand All @@ -7,6 +8,7 @@

from dbt_automation.utils.warehouseclient import get_client
from dbt_automation.utils.interfaces.warehouse_interface import WarehouseInterface
from dbt_automation import assets


basicConfig(level=INFO)
Expand Down Expand Up @@ -45,7 +47,10 @@ def scaffold(config: dict, warehouse: WarehouseInterface, project_dir: str):
flatten_json_target = Path(project_dir) / "macros" / "flatten_json.sql"
custom_schema_target = Path(project_dir) / "macros" / "generate_schema_name.sql"
logger.info("created %s", flatten_json_target)
shutil.copy("dbt_automation/assets/generate_schema_name.sql", custom_schema_target)
source_schema_name_macro_path = os.path.abspath(
os.path.join(os.path.abspath(assets.__file__), "..", "generate_schema_name.sql")
)
shutil.copy(source_schema_name_macro_path, custom_schema_target)
logger.info("created %s", custom_schema_target)

dbtproject_filename = Path(project_dir) / "dbt_project.yml"
Expand Down
3 changes: 3 additions & 0 deletions dbt_automation/operations/syncsources.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""reads tables from db to create a dbt sources.yml"""

import os
import argparse
from logging import basicConfig, getLogger, INFO
Expand Down Expand Up @@ -54,6 +55,8 @@ def sync_sources(config, warehouse: WarehouseInterface, project_dir):
yaml.safe_dump(merged_definitions, outfile, sort_keys=False)
logger.info("wrote source definitions to %s", sources_file)

return sources_file


if __name__ == "__main__":
load_dotenv("dbconnection.env")
Expand Down
4 changes: 4 additions & 0 deletions dbt_automation/utils/dbtproject.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ def write_model(
outfile.write(model_sql)
outfile.close()

return model_filename

def write_model_config(self, schema: str, models: list, **kwargs) -> None:
"""writes a .yml with a models: key"""
self.ensure_models_dir(schema, kwargs.get("subdir", ""))
Expand All @@ -61,3 +63,5 @@ def write_model_config(self, schema: str, models: list, **kwargs) -> None:
models_file,
sort_keys=False,
)

return models_filename
Loading