diff --git a/dbt_automation/assets/__init__.py b/dbt_automation/assets/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/dbt_automation/operations/arithmetic.py b/dbt_automation/operations/arithmetic.py index a2cd90c..6b0210a 100644 --- a/dbt_automation/operations/arithmetic.py +++ b/dbt_automation/operations/arithmetic.py @@ -72,5 +72,7 @@ def arithmetic(config: dict, warehouse: WarehouseInterface, project_dir: str): dbt_code += " FROM " + "{{ref('" + input_model + "')}}" + "\n" logger.info(f"writing dbt model {dbt_code}") - dbtproject.write_model(dest_schema, output_name, dbt_code) + model_sql_path = dbtproject.write_model(dest_schema, output_name, dbt_code) logger.info(f"dbt model {output_name} successfully created") + + return model_sql_path diff --git a/dbt_automation/operations/castdatatypes.py b/dbt_automation/operations/castdatatypes.py index 3a285d2..c740b5a 100644 --- a/dbt_automation/operations/castdatatypes.py +++ b/dbt_automation/operations/castdatatypes.py @@ -50,9 +50,11 @@ def cast_datatypes(config: dict, warehouse: WarehouseInterface, project_dir: str union_code += " FROM " + "{{ref('" + input_name + "')}}" + "\n" logger.info(f"writing dbt model {union_code}") - dbtproject.write_model( + model_sql_path = dbtproject.write_model( dest_schema, output_name, union_code, ) logger.info(f"dbt model {output_name} successfully created") + + return model_sql_path diff --git a/dbt_automation/operations/coalescecolumns.py b/dbt_automation/operations/coalescecolumns.py index cc365b7..aaf727d 100644 --- a/dbt_automation/operations/coalescecolumns.py +++ b/dbt_automation/operations/coalescecolumns.py @@ -38,9 +38,11 @@ def coalesce_columns(config: dict, warehouse: WarehouseInterface, project_dir: s union_code += " FROM " + "{{ref('" + input_name + "')}}" + "\n" logger.info(f"writing dbt model {union_code}") - dbtproject.write_model( + model_sql_path = dbtproject.write_model( dest_schema, output_name, union_code, ) logger.info(f"dbt model {output_name} successfully created") + + return model_sql_path diff --git a/dbt_automation/operations/concatcolumns.py b/dbt_automation/operations/concatcolumns.py index 0fca2de..223f592 100644 --- a/dbt_automation/operations/concatcolumns.py +++ b/dbt_automation/operations/concatcolumns.py @@ -28,13 +28,16 @@ def concat_columns(config: dict, warehouse: WarehouseInterface, project_dir: str dbt_code = "{{ config(materialized='table', schema='" + dest_schema + "') }}\n" concat_fields = ",".join( [ - quote_columnname(col["name"], warehouse.name) - if col["is_col"] in ["yes", True, "y"] - else f"'{col['name']}'" + ( + quote_columnname(col["name"], warehouse.name) + if col["is_col"] in ["yes", True, "y"] + else f"'{col['name']}'" + ) for col in columns ] ) dbt_code += f"SELECT *, CONCAT({concat_fields}) AS {output_column_name}" dbt_code += " FROM " + "{{ref('" + input_name + "')}}" + "\n" - dbtproject.write_model(dest_schema, output_name, dbt_code) + model_sql_path = dbtproject.write_model(dest_schema, output_name, dbt_code) + return model_sql_path diff --git a/dbt_automation/operations/droprenamecolumns.py b/dbt_automation/operations/droprenamecolumns.py index 5a02563..4876cf1 100644 --- a/dbt_automation/operations/droprenamecolumns.py +++ b/dbt_automation/operations/droprenamecolumns.py @@ -1,4 +1,5 @@ """drop and rename columns""" + from logging import basicConfig, getLogger, INFO from dbt_automation.utils.dbtproject import dbtProject @@ -24,7 +25,8 @@ def drop_columns(config: dict, warehouse: WarehouseInterface, project_dir: str): dbtproject = dbtProject(project_dir) dbtproject.ensure_models_dir(dest_schema) - dbtproject.write_model(dest_schema, output_model_name, model_code) + model_sql_path = dbtproject.write_model(dest_schema, output_model_name, model_code) + return model_sql_path def rename_columns(config: dict, warehouse, project_dir: str): @@ -48,4 +50,5 @@ def rename_columns(config: dict, warehouse, project_dir: str): model_code = model_code[:-2] # Remove trailing comma and space model_code += f'\nFROM \n {{{{ ref("{input_name}") }}}}' - dbtproject.write_model(dest_schema, output_model_name, model_code) + model_sql_path = dbtproject.write_model(dest_schema, output_model_name, model_code) + return model_sql_path diff --git a/dbt_automation/operations/flattenairbyte.py b/dbt_automation/operations/flattenairbyte.py index 668029a..efc82fc 100644 --- a/dbt_automation/operations/flattenairbyte.py +++ b/dbt_automation/operations/flattenairbyte.py @@ -1,4 +1,5 @@ """generates models to flatten airbyte raw data""" + import sys from logging import basicConfig, getLogger, INFO @@ -79,7 +80,8 @@ def flatten_operation(config: dict, warehouse: WarehouseInterface, project_dir: logger.info(f"completed flattening {tablename}") # finally write the yml with the models configuration - dbtproject.write_model_config(DEST_SCHEMA, models, logger=logger) + models_yml_path = dbtproject.write_model_config(DEST_SCHEMA, models, logger=logger) + return models_yml_path # ================================================================================ diff --git a/dbt_automation/operations/flattenjson.py b/dbt_automation/operations/flattenjson.py index 04b69a1..011094a 100644 --- a/dbt_automation/operations/flattenjson.py +++ b/dbt_automation/operations/flattenjson.py @@ -1,4 +1,5 @@ """pull fields out of a json field into their own columns""" + from logging import basicConfig, getLogger, INFO from dbt_automation.utils.dbtproject import dbtProject @@ -54,4 +55,5 @@ def flattenjson(config: dict, warehouse: WarehouseInterface, project_dir: str): dbtproject = dbtProject(project_dir) dbtproject.ensure_models_dir(dest_schema) - dbtproject.write_model(dest_schema, output_name, model_code) + model_sql_path = dbtproject.write_model(dest_schema, output_name, model_code) + return model_sql_path diff --git a/dbt_automation/operations/mergetables.py b/dbt_automation/operations/mergetables.py index dea7d11..91b6459 100644 --- a/dbt_automation/operations/mergetables.py +++ b/dbt_automation/operations/mergetables.py @@ -1,4 +1,5 @@ """takes a list of tables and a common column spec and creates a dbt model to merge them""" + import os import argparse @@ -50,13 +51,15 @@ def union_tables(config, warehouse: WarehouseInterface, project_dir): union_code += ")}}" logger.info(f"writing dbt model {union_code}") - dbtproject.write_model( + model_sql_path = dbtproject.write_model( dest_schema, output_model_name, union_code, ) logger.info(f"dbt model {output_model_name} successfully created") + return model_sql_path + if __name__ == "__main__": # pylint:disable=invalid-name diff --git a/dbt_automation/operations/regexextraction.py b/dbt_automation/operations/regexextraction.py index e848b59..685cb36 100644 --- a/dbt_automation/operations/regexextraction.py +++ b/dbt_automation/operations/regexextraction.py @@ -1,4 +1,5 @@ """extract from a regex""" + from dbt_automation.utils.columnutils import quote_columnname from dbt_automation.utils.dbtproject import dbtProject from dbt_automation.utils.interfaces.warehouse_interface import WarehouseInterface @@ -36,4 +37,5 @@ def regex_extraction(config: dict, warehouse: WarehouseInterface, project_dir: s model_code += f'\nFROM \n {{{{ ref("{input_name}") }}}}' - dbtproject.write_model(dest_schema, output_model_name, model_code) + model_sql_path = dbtproject.write_model(dest_schema, output_model_name, model_code) + return model_sql_path diff --git a/dbt_automation/operations/scaffold.py b/dbt_automation/operations/scaffold.py index 96dfe0c..3d8dcdc 100644 --- a/dbt_automation/operations/scaffold.py +++ b/dbt_automation/operations/scaffold.py @@ -1,4 +1,5 @@ """setup the dbt project""" + import os, shutil, yaml from pathlib import Path from string import Template @@ -7,6 +8,7 @@ from dbt_automation.utils.warehouseclient import get_client from dbt_automation.utils.interfaces.warehouse_interface import WarehouseInterface +from dbt_automation import assets basicConfig(level=INFO) @@ -45,7 +47,10 @@ def scaffold(config: dict, warehouse: WarehouseInterface, project_dir: str): flatten_json_target = Path(project_dir) / "macros" / "flatten_json.sql" custom_schema_target = Path(project_dir) / "macros" / "generate_schema_name.sql" logger.info("created %s", flatten_json_target) - shutil.copy("dbt_automation/assets/generate_schema_name.sql", custom_schema_target) + source_schema_name_macro_path = os.path.abspath( + os.path.join(os.path.abspath(assets.__file__), "..", "generate_schema_name.sql") + ) + shutil.copy(source_schema_name_macro_path, custom_schema_target) logger.info("created %s", custom_schema_target) dbtproject_filename = Path(project_dir) / "dbt_project.yml" diff --git a/dbt_automation/operations/syncsources.py b/dbt_automation/operations/syncsources.py index c38a605..53d9b05 100644 --- a/dbt_automation/operations/syncsources.py +++ b/dbt_automation/operations/syncsources.py @@ -1,4 +1,5 @@ """reads tables from db to create a dbt sources.yml""" + import os import argparse from logging import basicConfig, getLogger, INFO @@ -54,6 +55,8 @@ def sync_sources(config, warehouse: WarehouseInterface, project_dir): yaml.safe_dump(merged_definitions, outfile, sort_keys=False) logger.info("wrote source definitions to %s", sources_file) + return sources_file + if __name__ == "__main__": load_dotenv("dbconnection.env") diff --git a/dbt_automation/utils/dbtproject.py b/dbt_automation/utils/dbtproject.py index 180fc78..1bb622e 100644 --- a/dbt_automation/utils/dbtproject.py +++ b/dbt_automation/utils/dbtproject.py @@ -44,6 +44,8 @@ def write_model( outfile.write(model_sql) outfile.close() + return model_filename + def write_model_config(self, schema: str, models: list, **kwargs) -> None: """writes a .yml with a models: key""" self.ensure_models_dir(schema, kwargs.get("subdir", "")) @@ -61,3 +63,5 @@ def write_model_config(self, schema: str, models: list, **kwargs) -> None: models_file, sort_keys=False, ) + + return models_filename