Skip to content

Commit

Permalink
update dbt-adapters
Browse files Browse the repository at this point in the history
  • Loading branch information
aranke committed Jun 18, 2024
1 parent f01a675 commit b59b0df
Showing 1 changed file with 44 additions and 64 deletions.
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
import pytest

from dbt_common.contracts.metadata import (
TableMetadata,
StatsItem,
CatalogTable,
ColumnMetadata,
)
from dbt.tests.util import run_dbt, get_connection

models__my_model_sql = """
models__my_table_model_sql = """
select * from {{ ref('my_seed') }}
"""


models__my_view_model_sql = """
{{
config(
materialized='view',
Expand Down Expand Up @@ -37,71 +36,52 @@ def seeds(self):
@pytest.fixture(scope="class")
def models(self):
return {
"my_model.sql": models__my_model_sql,
"my_view_model.sql": models__my_view_model_sql,
"my_table_model.sql": models__my_table_model_sql,
}

def test_get_catalog_for_single_relation(self, project):
results = run_dbt(["seed"])
assert len(results) == 1

results = run_dbt(["run"])
assert len(results) == 1
@pytest.fixture(scope="class")
def expected_catalog_my_seed(self, project):
raise NotImplementedError(
"To use this test, please implement `get_catalog_for_single_relation`, inherited from `SQLAdapter`."
)

expected = CatalogTable(
metadata=TableMetadata(
type="VIEW",
schema=project.test_schema.upper(),
name="MY_MODEL",
database=project.database,
comment="",
owner="TESTER",
),
columns={
"ID": ColumnMetadata(type="NUMBER", index=1, name="ID", comment=None),
"FIRST_NAME": ColumnMetadata(
type="VARCHAR", index=2, name="FIRST_NAME", comment=None
),
"EMAIL": ColumnMetadata(type="VARCHAR", index=3, name="EMAIL", comment=None),
"IP_ADDRESS": ColumnMetadata(
type="VARCHAR", index=4, name="IP_ADDRESS", comment=None
),
"UPDATED_AT": ColumnMetadata(
type="TIMESTAMP_NTZ", index=5, name="UPDATED_AT", comment=None
),
},
stats={
"has_stats": StatsItem(
id="has_stats",
label="Has Stats?",
value=True,
include=False,
description="Indicates whether there are statistics for this table",
),
"row_count": StatsItem(
id="row_count",
label="Row Count",
value=0,
include=True,
description="Number of rows in the table as reported by Snowflake",
),
"bytes": StatsItem(
id="bytes",
label="Approximate Size",
value=0,
include=True,
description="Size of the table as reported by Snowflake",
),
},
unique_id=None,
@pytest.fixture(scope="class")
def expected_catalog_my_model(self, project):
raise NotImplementedError(
"To use this test, please implement `get_catalog_for_single_relation`, inherited from `SQLAdapter`."
)

my_model_relation = project.adapter.get_relation(
def get_relation_for_identifier(self, project, identifier):
return project.adapter.get_relation(
database=project.database,
schema=project.test_schema,
identifier="MY_MODEL",
identifier=identifier,
)

def test_get_catalog_for_single_relation(
self, project, expected_catalog_my_seed, expected_catalog_my_view_model
):
results = run_dbt(["seed"])
assert len(results) == 1

my_seed_relation = self.get_relation_for_identifier(project, "my_seed")

with get_connection(project.adapter):
actual_catalog_my_seed = project.adapter.get_catalog_for_single_relation(
my_seed_relation
)

assert actual_catalog_my_seed == expected_catalog_my_seed

results = run_dbt(["run"])
assert len(results) == 2

my_view_model_relation = self.get_relation_for_identifier(project, "my_view_model")

with get_connection(project.adapter):
actual = project.adapter.get_catalog_for_single_relation(my_model_relation)
actual_catalog_my_view_model = project.adapter.get_catalog_for_single_relation(
my_view_model_relation
)

assert actual == expected
assert actual_catalog_my_view_model == expected_catalog_my_view_model

0 comments on commit b59b0df

Please sign in to comment.