From 29adc72765f940c31647197c002e5e9c55841526 Mon Sep 17 00:00:00 2001 From: Mike Alfare Date: Mon, 18 Dec 2023 15:54:24 -0500 Subject: [PATCH 1/6] changelog --- .changes/unreleased/Features-20231218-155409.yaml | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 .changes/unreleased/Features-20231218-155409.yaml diff --git a/.changes/unreleased/Features-20231218-155409.yaml b/.changes/unreleased/Features-20231218-155409.yaml new file mode 100644 index 000000000..bc965b06f --- /dev/null +++ b/.changes/unreleased/Features-20231218-155409.yaml @@ -0,0 +1,6 @@ +kind: Features +body: Add support for checking table-last-modified by metadata +time: 2023-12-18T15:54:09.69635-05:00 +custom: + Author: mikealfare + Issue: "938" From ee64b0e846927b6b59181d4fcd68816e43318e04 Mon Sep 17 00:00:00 2001 From: Mike Alfare Date: Mon, 18 Dec 2023 17:31:58 -0500 Subject: [PATCH 2/6] turn on metadata-based source freshness capability --- dbt/adapters/bigquery/impl.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/dbt/adapters/bigquery/impl.py b/dbt/adapters/bigquery/impl.py index 7d9b003b8..585979164 100644 --- a/dbt/adapters/bigquery/impl.py +++ b/dbt/adapters/bigquery/impl.py @@ -17,6 +17,7 @@ available, ) from dbt.adapters.cache import _make_ref_key_dict # type: ignore +from dbt.adapters.capability import CapabilityDict, CapabilitySupport, Support, Capability import dbt.clients.agate_helper from dbt.contracts.connection import AdapterResponse from dbt.contracts.graph.manifest import Manifest @@ -116,6 +117,12 @@ class BigQueryAdapter(BaseAdapter): ConstraintType.foreign_key: ConstraintSupport.ENFORCED, } + _capabilities: CapabilityDict = CapabilityDict( + { + Capability.TableLastModifiedMetadata: CapabilitySupport(support=Support.Full), + } + ) + def __init__(self, config) -> None: super().__init__(config) self.connections: BigQueryConnectionManager = self.connections From af8d60f8e1e4e697d39cd217bae1cec914b946a3 Mon Sep 17 00:00:00 2001 From: Mike Alfare Date: Mon, 18 Dec 2023 21:19:52 -0500 Subject: [PATCH 3/6] add boundary test to confirm get_table raises an error properly --- tests/boundary/test_bigquery_sdk.py | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) create mode 100644 tests/boundary/test_bigquery_sdk.py diff --git a/tests/boundary/test_bigquery_sdk.py b/tests/boundary/test_bigquery_sdk.py new file mode 100644 index 000000000..52e8db311 --- /dev/null +++ b/tests/boundary/test_bigquery_sdk.py @@ -0,0 +1,20 @@ +import pytest + +from dbt.tests.util import get_connection +from google.cloud.bigquery import Client, DatasetReference, TableReference +from google.api_core.exceptions import NotFound + + +class TestBigQuerySDK: + """ + TODO: replace dbt project methods with direct connection instantiation + """ + + @pytest.mark.parametrize("table_name", ["this_table_does_not_exist"]) + def test_get_table_does_not_exist(self, project, table_name): + with get_connection(project.adapter) as conn: + client: Client = conn.handle + dataset_ref = DatasetReference(project.database, project.test_schema) + table_ref = TableReference(dataset_ref, table_name) + with pytest.raises(NotFound): + client.get_table(table_ref) From faeadb4ec8664b85295885b359164d8cbb959dfc Mon Sep 17 00:00:00 2001 From: Mike Alfare Date: Mon, 18 Dec 2023 21:21:48 -0500 Subject: [PATCH 4/6] add metadata-based source freshness for a relation --- dbt/adapters/bigquery/impl.py | 25 +++++++++++++++- .../adapter/sources_freshness_tests/files.py | 23 ++++++++++++++ .../test_get_relation_last_modified.py | 30 +++++++++++++++++++ 3 files changed, 77 insertions(+), 1 deletion(-) create mode 100644 tests/functional/adapter/sources_freshness_tests/files.py create mode 100644 tests/functional/adapter/sources_freshness_tests/test_get_relation_last_modified.py diff --git a/dbt/adapters/bigquery/impl.py b/dbt/adapters/bigquery/impl.py index 585979164..cf03aa2f0 100644 --- a/dbt/adapters/bigquery/impl.py +++ b/dbt/adapters/bigquery/impl.py @@ -1,8 +1,9 @@ from dataclasses import dataclass +from datetime import datetime import json import threading import time -from typing import Any, Dict, List, Optional, Type, Set, Union +from typing import Any, Dict, List, Optional, Tuple, Type, Set, Union import agate from dbt import ui # type: ignore @@ -16,6 +17,7 @@ SchemaSearchMap, available, ) +from dbt.adapters.base.impl import FreshnessResponse from dbt.adapters.cache import _make_ref_key_dict # type: ignore from dbt.adapters.capability import CapabilityDict, CapabilitySupport, Support, Capability import dbt.clients.agate_helper @@ -35,6 +37,7 @@ import google.cloud.bigquery from google.cloud.bigquery import AccessEntry, SchemaField, Table as BigQueryTable import google.cloud.exceptions +import pytz from dbt.adapters.bigquery import BigQueryColumn, BigQueryConnectionManager from dbt.adapters.bigquery.column import get_nested_column_data_types @@ -711,6 +714,26 @@ def _get_catalog_schemas(self, manifest: Manifest) -> SchemaSearchMap: ) return result + def calculate_freshness_from_metadata( + self, + source: BaseRelation, + manifest: Optional[Manifest] = None, + ) -> Tuple[Optional[AdapterResponse], FreshnessResponse]: + conn = self.connections.get_thread_connection() + client: google.cloud.bigquery.Client = conn.handle + + table_ref = self.get_table_ref_from_relation(source) + table = client.get_table(table_ref) + snapshot = datetime.now(tz=pytz.UTC) + + freshness = FreshnessResponse( + max_loaded_at=table.modified, + snapshotted_at=snapshot, + age=(snapshot - table.modified).total_seconds(), + ) + + return None, freshness + @available.parse(lambda *a, **k: {}) def get_common_options( self, config: Dict[str, Any], node: Dict[str, Any], temporary: bool = False diff --git a/tests/functional/adapter/sources_freshness_tests/files.py b/tests/functional/adapter/sources_freshness_tests/files.py new file mode 100644 index 000000000..eaca96648 --- /dev/null +++ b/tests/functional/adapter/sources_freshness_tests/files.py @@ -0,0 +1,23 @@ +SCHEMA_YML = """version: 2 +sources: + - name: test_source + freshness: + warn_after: {count: 10, period: hour} + error_after: {count: 1, period: day} + schema: "{{ env_var('DBT_GET_LAST_RELATION_TEST_SCHEMA') }}" + tables: + - name: test_source +""" + +SEED_TEST_SOURCE_CSV = """ +id,name +1,Martin +2,Jeter +3,Ruth +4,Gehrig +5,DiMaggio +6,Torre +7,Mantle +8,Berra +9,Maris +""".strip() diff --git a/tests/functional/adapter/sources_freshness_tests/test_get_relation_last_modified.py b/tests/functional/adapter/sources_freshness_tests/test_get_relation_last_modified.py new file mode 100644 index 000000000..a43911f2d --- /dev/null +++ b/tests/functional/adapter/sources_freshness_tests/test_get_relation_last_modified.py @@ -0,0 +1,30 @@ +import os +import pytest + +from dbt.tests.util import run_dbt + +from tests.functional.adapter.sources_freshness_tests import files + + +class TestGetLastRelationModified: + @pytest.fixture(scope="class") + def seeds(self): + return {"test_source.csv": files.SEED_TEST_SOURCE_CSV} + + @pytest.fixture(scope="class") + def models(self): + return {"schema.yml": files.SCHEMA_YML} + + @pytest.fixture(scope="class", autouse=True) + def setup(self, project): + # we need the schema name for the sources section + os.environ["DBT_GET_LAST_RELATION_TEST_SCHEMA"] = project.test_schema + run_dbt(["seed"]) + yield + del os.environ["DBT_GET_LAST_RELATION_TEST_SCHEMA"] + + def test_get_last_relation_modified(self, project, setup): + results = run_dbt(["source", "freshness"]) + assert len(results) == 1 + result = results[0] + assert result.status == "pass" From d03e743cf5c4e87f30cf44a67380a793e56de15b Mon Sep 17 00:00:00 2001 From: Mike Alfare Date: Mon, 18 Dec 2023 21:24:09 -0500 Subject: [PATCH 5/6] remove unnecessary test setup --- tests/boundary/test_bigquery_sdk.py | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/tests/boundary/test_bigquery_sdk.py b/tests/boundary/test_bigquery_sdk.py index 52e8db311..b8e6c9995 100644 --- a/tests/boundary/test_bigquery_sdk.py +++ b/tests/boundary/test_bigquery_sdk.py @@ -5,16 +5,14 @@ from google.api_core.exceptions import NotFound -class TestBigQuerySDK: +@pytest.mark.parametrize("table_name", ["this_table_does_not_exist"]) +def test_get_table_does_not_exist(project, table_name): """ TODO: replace dbt project methods with direct connection instantiation """ - - @pytest.mark.parametrize("table_name", ["this_table_does_not_exist"]) - def test_get_table_does_not_exist(self, project, table_name): - with get_connection(project.adapter) as conn: - client: Client = conn.handle - dataset_ref = DatasetReference(project.database, project.test_schema) - table_ref = TableReference(dataset_ref, table_name) - with pytest.raises(NotFound): - client.get_table(table_ref) + with get_connection(project.adapter) as conn: + client: Client = conn.handle + dataset_ref = DatasetReference(project.database, project.test_schema) + table_ref = TableReference(dataset_ref, table_name) + with pytest.raises(NotFound): + client.get_table(table_ref) From 1edadf70cabeb60a08308b9a45979115eba4db3f Mon Sep 17 00:00:00 2001 From: Mike Alfare Date: Tue, 19 Dec 2023 13:45:58 -0500 Subject: [PATCH 6/6] remove unnecessary fixture from test --- .../sources_freshness_tests/test_get_relation_last_modified.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/functional/adapter/sources_freshness_tests/test_get_relation_last_modified.py b/tests/functional/adapter/sources_freshness_tests/test_get_relation_last_modified.py index a43911f2d..08e263edb 100644 --- a/tests/functional/adapter/sources_freshness_tests/test_get_relation_last_modified.py +++ b/tests/functional/adapter/sources_freshness_tests/test_get_relation_last_modified.py @@ -23,7 +23,7 @@ def setup(self, project): yield del os.environ["DBT_GET_LAST_RELATION_TEST_SCHEMA"] - def test_get_last_relation_modified(self, project, setup): + def test_get_last_relation_modified(self, project): results = run_dbt(["source", "freshness"]) assert len(results) == 1 result = results[0]