From d736c925e8c083c6f2b9c567dc33512049f3c2df Mon Sep 17 00:00:00 2001 From: Ishan Date: Fri, 29 Sep 2023 11:37:59 +0530 Subject: [PATCH 1/2] use ref while generating premerge table --- mergetables.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mergetables.py b/mergetables.py index 27648b3..7a43bf0 100644 --- a/mergetables.py +++ b/mergetables.py @@ -70,7 +70,7 @@ def get_columnspec(schema_: str, table_: str): else: statement += f'NULL AS "{column}",' statement = statement[:-1] # drop the final comma - statement += f" FROM {table['schema']}.{table['tablename']}" + statement += f"FROM {{{{ref('{table['tablename']}')}}}}" dbtproject.write_model( mergespec["outputsschema"], f'premerge_{table["tablename"]}', From 23b6b6c69823a93f2ab9cc2db61368c27682e639 Mon Sep 17 00:00:00 2001 From: Ishan Date: Fri, 29 Sep 2023 12:09:18 +0530 Subject: [PATCH 2/2] use source dbt function to access sources - source(src_name, table_name) instead of schema.staging - gives us more flexibility --- flattenairbyte.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/flattenairbyte.py b/flattenairbyte.py index 92427d2..883e0c4 100644 --- a/flattenairbyte.py +++ b/flattenairbyte.py @@ -73,7 +73,7 @@ def mk_dbtmodel(sourcename: str, srctablename: str, columntuples: list): dbtmodel += f", _airbyte_data->>'{json_field}' as \"{sql_column}\"" dbtmodel += "\n" - dbtmodel += f"FROM {sourcename}.{srctablename}" + dbtmodel += f"FROM {{{{source('{sourcename}','{srctablename}')}}}}" return dbtmodel @@ -118,8 +118,8 @@ def mk_dbtmodel(sourcename: str, srctablename: str, columntuples: list): # and the .sql model model_sql = mk_dbtmodel( - SOURCE_SCHEMA, - tablename, + source['name'], # pass the source in the yaml file + modelname, zip(json_fields, sql_columns), ) dbtproject.write_model(DEST_SCHEMA, modelname, model_sql, logger=logger)