diff --git a/.changes/unreleased/Features-20231218-155409.yaml b/.changes/unreleased/Features-20231218-155409.yaml new file mode 100644 index 000000000..bc965b06f --- /dev/null +++ b/.changes/unreleased/Features-20231218-155409.yaml @@ -0,0 +1,6 @@ +kind: Features +body: Add support for checking table-last-modified by metadata +time: 2023-12-18T15:54:09.69635-05:00 +custom: + Author: mikealfare + Issue: "938" diff --git a/dbt/adapters/bigquery/impl.py b/dbt/adapters/bigquery/impl.py index 7d9b003b8..cf03aa2f0 100644 --- a/dbt/adapters/bigquery/impl.py +++ b/dbt/adapters/bigquery/impl.py @@ -1,8 +1,9 @@ from dataclasses import dataclass +from datetime import datetime import json import threading import time -from typing import Any, Dict, List, Optional, Type, Set, Union +from typing import Any, Dict, List, Optional, Tuple, Type, Set, Union import agate from dbt import ui # type: ignore @@ -16,7 +17,9 @@ SchemaSearchMap, available, ) +from dbt.adapters.base.impl import FreshnessResponse from dbt.adapters.cache import _make_ref_key_dict # type: ignore +from dbt.adapters.capability import CapabilityDict, CapabilitySupport, Support, Capability import dbt.clients.agate_helper from dbt.contracts.connection import AdapterResponse from dbt.contracts.graph.manifest import Manifest @@ -34,6 +37,7 @@ import google.cloud.bigquery from google.cloud.bigquery import AccessEntry, SchemaField, Table as BigQueryTable import google.cloud.exceptions +import pytz from dbt.adapters.bigquery import BigQueryColumn, BigQueryConnectionManager from dbt.adapters.bigquery.column import get_nested_column_data_types @@ -116,6 +120,12 @@ class BigQueryAdapter(BaseAdapter): ConstraintType.foreign_key: ConstraintSupport.ENFORCED, } + _capabilities: CapabilityDict = CapabilityDict( + { + Capability.TableLastModifiedMetadata: CapabilitySupport(support=Support.Full), + } + ) + def __init__(self, config) -> None: super().__init__(config) self.connections: BigQueryConnectionManager = self.connections @@ -704,6 +714,26 @@ def _get_catalog_schemas(self, manifest: Manifest) -> SchemaSearchMap: ) return result + def calculate_freshness_from_metadata( + self, + source: BaseRelation, + manifest: Optional[Manifest] = None, + ) -> Tuple[Optional[AdapterResponse], FreshnessResponse]: + conn = self.connections.get_thread_connection() + client: google.cloud.bigquery.Client = conn.handle + + table_ref = self.get_table_ref_from_relation(source) + table = client.get_table(table_ref) + snapshot = datetime.now(tz=pytz.UTC) + + freshness = FreshnessResponse( + max_loaded_at=table.modified, + snapshotted_at=snapshot, + age=(snapshot - table.modified).total_seconds(), + ) + + return None, freshness + @available.parse(lambda *a, **k: {}) def get_common_options( self, config: Dict[str, Any], node: Dict[str, Any], temporary: bool = False diff --git a/tests/boundary/test_bigquery_sdk.py b/tests/boundary/test_bigquery_sdk.py new file mode 100644 index 000000000..b8e6c9995 --- /dev/null +++ b/tests/boundary/test_bigquery_sdk.py @@ -0,0 +1,18 @@ +import pytest + +from dbt.tests.util import get_connection +from google.cloud.bigquery import Client, DatasetReference, TableReference +from google.api_core.exceptions import NotFound + + +@pytest.mark.parametrize("table_name", ["this_table_does_not_exist"]) +def test_get_table_does_not_exist(project, table_name): + """ + TODO: replace dbt project methods with direct connection instantiation + """ + with get_connection(project.adapter) as conn: + client: Client = conn.handle + dataset_ref = DatasetReference(project.database, project.test_schema) + table_ref = TableReference(dataset_ref, table_name) + with pytest.raises(NotFound): + client.get_table(table_ref) diff --git a/tests/functional/adapter/sources_freshness_tests/files.py b/tests/functional/adapter/sources_freshness_tests/files.py new file mode 100644 index 000000000..eaca96648 --- /dev/null +++ b/tests/functional/adapter/sources_freshness_tests/files.py @@ -0,0 +1,23 @@ +SCHEMA_YML = """version: 2 +sources: + - name: test_source + freshness: + warn_after: {count: 10, period: hour} + error_after: {count: 1, period: day} + schema: "{{ env_var('DBT_GET_LAST_RELATION_TEST_SCHEMA') }}" + tables: + - name: test_source +""" + +SEED_TEST_SOURCE_CSV = """ +id,name +1,Martin +2,Jeter +3,Ruth +4,Gehrig +5,DiMaggio +6,Torre +7,Mantle +8,Berra +9,Maris +""".strip() diff --git a/tests/functional/adapter/sources_freshness_tests/test_get_relation_last_modified.py b/tests/functional/adapter/sources_freshness_tests/test_get_relation_last_modified.py new file mode 100644 index 000000000..08e263edb --- /dev/null +++ b/tests/functional/adapter/sources_freshness_tests/test_get_relation_last_modified.py @@ -0,0 +1,30 @@ +import os +import pytest + +from dbt.tests.util import run_dbt + +from tests.functional.adapter.sources_freshness_tests import files + + +class TestGetLastRelationModified: + @pytest.fixture(scope="class") + def seeds(self): + return {"test_source.csv": files.SEED_TEST_SOURCE_CSV} + + @pytest.fixture(scope="class") + def models(self): + return {"schema.yml": files.SCHEMA_YML} + + @pytest.fixture(scope="class", autouse=True) + def setup(self, project): + # we need the schema name for the sources section + os.environ["DBT_GET_LAST_RELATION_TEST_SCHEMA"] = project.test_schema + run_dbt(["seed"]) + yield + del os.environ["DBT_GET_LAST_RELATION_TEST_SCHEMA"] + + def test_get_last_relation_modified(self, project): + results = run_dbt(["source", "freshness"]) + assert len(results) == 1 + result = results[0] + assert result.status == "pass"