diff --git a/flattenairbyte.py b/flattenairbyte.py index 92427d2..883e0c4 100644 --- a/flattenairbyte.py +++ b/flattenairbyte.py @@ -73,7 +73,7 @@ def mk_dbtmodel(sourcename: str, srctablename: str, columntuples: list): dbtmodel += f", _airbyte_data->>'{json_field}' as \"{sql_column}\"" dbtmodel += "\n" - dbtmodel += f"FROM {sourcename}.{srctablename}" + dbtmodel += f"FROM {{{{source('{sourcename}','{srctablename}')}}}}" return dbtmodel @@ -118,8 +118,8 @@ def mk_dbtmodel(sourcename: str, srctablename: str, columntuples: list): # and the .sql model model_sql = mk_dbtmodel( - SOURCE_SCHEMA, - tablename, + source['name'], # pass the source in the yaml file + modelname, zip(json_fields, sql_columns), ) dbtproject.write_model(DEST_SCHEMA, modelname, model_sql, logger=logger) diff --git a/mergetables.py b/mergetables.py index cec7051..80adc46 100644 --- a/mergetables.py +++ b/mergetables.py @@ -81,7 +81,7 @@ def get_columnspec(schema_: str, table_: str): else: statement += f'NULL AS "{column}",' statement = statement[:-1] # drop the final comma - statement += f" FROM {table['schema']}.{table['tablename']}" + statement += f"FROM {{{{ref('{table['tablename']}')}}}}" dbtproject.write_model( mergespec["outputsschema"], f'premerge_{table["tablename"]}',