diff --git a/CHANGELOG.md b/CHANGELOG.md index 081a617d610..f7ea88d24ca 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ - Add deprecation warnings to anonymous usage tracking ([#2688](https://github.com/fishtown-analytics/dbt/issues/2688), [#2710](https://github.com/fishtown-analytics/dbt/issues/2710)) ### Features +- Add a BigQuery adapter macro to enable usage of CopyJobs ([#2709](https://github.com/fishtown-analytics/dbt/pull/2709)) - Support TTL for BigQuery tables([#2711](https://github.com/fishtown-analytics/dbt/pull/2711)) - Add better retry support when using the BigQuery adapter ([#2694](https://github.com/fishtown-analytics/dbt/pull/2694), follow-up to [#1963](https://github.com/fishtown-analytics/dbt/pull/1963)) - Added a `dispatch` method to the context adapter and deprecated `adapter_macro`. ([#2302](https://github.com/fishtown-analytics/dbt/issues/2302), [#2679](https://github.com/fishtown-analytics/dbt/pull/2679)) @@ -30,7 +31,7 @@ Contributors: - [@bbhoss](https://github.com/bbhoss) ([#2677](https://github.com/fishtown-analytics/dbt/pull/2677)) -- [@kconvey](https://github.com/kconvey) ([#2694](https://github.com/fishtown-analytics/dbt/pull/2694), [#2711], (https://github.com/fishtown-analytics/dbt/pull/2711)) +- [@kconvey](https://github.com/kconvey) ([#2694](https://github.com/fishtown-analytics/dbt/pull/2694), [#2709](https://github.com/fishtown-analytics/dbt/pull/2709)), [#2711](https://github.com/fishtown-analytics/dbt/pull/2711)) - [@vogt4nick](https://github.com/vogt4nick) ([#2702](https://github.com/fishtown-analytics/dbt/issues/2702)) - [@stephen8chang](https://github.com/stephen8chang) ([docs#106](https://github.com/fishtown-analytics/dbt-docs/pull/106), [docs#108](https://github.com/fishtown-analytics/dbt-docs/pull/108), [docs#113](https://github.com/fishtown-analytics/dbt-docs/pull/113)) - [@rsenseman](https://github.com/rsenseman) ([#2708](https://github.com/fishtown-analytics/dbt/pull/2708)) diff --git a/plugins/bigquery/dbt/adapters/bigquery/connections.py b/plugins/bigquery/dbt/adapters/bigquery/connections.py index a64435aecaf..5fb2857a043 100644 --- a/plugins/bigquery/dbt/adapters/bigquery/connections.py +++ b/plugins/bigquery/dbt/adapters/bigquery/connections.py @@ -26,6 +26,8 @@ BQ_QUERY_JOB_SPLIT = '-----Query Job SQL Follows-----' +WRITE_TRUNCATE = google.cloud.bigquery.job.WriteDisposition.WRITE_TRUNCATE + REOPENABLE_ERRORS = ( ConnectionResetError, ConnectionError, @@ -336,7 +338,7 @@ def create_table(self, database, schema, table_name, sql): table_ref = self.table_ref(database, schema, table_name, conn) job_params = {'destination': table_ref, - 'write_disposition': 'WRITE_TRUNCATE'} + 'write_disposition': WRITE_TRUNCATE} timeout = self.get_timeout(conn) @@ -352,6 +354,32 @@ def callback(table): self.create_bigquery_table(database, schema, table_name, callback, 'CREATE DAY PARTITIONED TABLE') + def copy_bq_table(self, source, destination, write_disposition): + conn = self.get_thread_connection() + client = conn.handle + + source_ref = self.table_ref( + source.database, source.schema, source.table, conn) + destination_ref = self.table_ref( + destination.database, destination.schema, destination.table, conn) + + logger.debug( + 'Copying table "{}" to "{}" with disposition: "{}"', + source_ref.path, destination_ref.path, write_disposition) + + def copy_and_results(): + job_config = google.cloud.bigquery.CopyJobConfig( + write_disposition=write_disposition) + copy_job = client.copy_table( + source_ref, destination_ref, job_config=job_config) + iterator = copy_job.result(timeout=self.get_timeout(conn)) + return copy_job, iterator + + self._retry_and_handle( + msg='copy table "{}" to "{}"'.format( + source_ref.path, destination_ref.path), + conn=conn, fn=copy_and_results) + @staticmethod def dataset(database, schema, conn): dataset_ref = conn.handle.dataset(schema, database) diff --git a/plugins/bigquery/dbt/adapters/bigquery/impl.py b/plugins/bigquery/dbt/adapters/bigquery/impl.py index cbd4bd6b69f..3d6bea5320a 100644 --- a/plugins/bigquery/dbt/adapters/bigquery/impl.py +++ b/plugins/bigquery/dbt/adapters/bigquery/impl.py @@ -32,6 +32,10 @@ import agate import json +# Write dispositions for bigquery. +WRITE_APPEND = google.cloud.bigquery.job.WriteDisposition.WRITE_APPEND +WRITE_TRUNCATE = google.cloud.bigquery.job.WriteDisposition.WRITE_TRUNCATE + def sql_escape(string): if not isinstance(string, str): @@ -417,6 +421,23 @@ def _materialize_as_table( return "CREATE TABLE" + @available.parse(lambda *a, **k: '') + def copy_table(self, source, destination, materialization): + if materialization == 'incremental': + write_disposition = WRITE_APPEND + elif materialization == 'table': + write_disposition = WRITE_TRUNCATE + else: + dbt.exceptions.raise_compiler_error( + 'Copy table materialization must be "copy" or "table", but ' + f"config.get('copy_materialization', 'table') was " + f'{materialization}') + + self.connections.copy_bq_table( + source, destination, write_disposition) + + return "COPY TABLE with materialization: {}".format(materialization) + @classmethod def poll_until_job_completes(cls, job, timeout): retry_count = timeout diff --git a/plugins/bigquery/dbt/include/bigquery/macros/materializations/copy.sql b/plugins/bigquery/dbt/include/bigquery/macros/materializations/copy.sql new file mode 100644 index 00000000000..7b28975efbe --- /dev/null +++ b/plugins/bigquery/dbt/include/bigquery/macros/materializations/copy.sql @@ -0,0 +1,39 @@ +{% materialization copy, adapter='bigquery' -%} + + {# Setup #} + {{ run_hooks(pre_hooks) }} + + {# there should be exactly one ref or exactly one source #} + {% set destination = this.incorporate(type='table') %} + + {% set dependency_type = none %} + {% if (model.refs | length) == 1 and (model.sources | length) == 0 %} + {% set dependency_type = 'ref' %} + {% elif (model.refs | length) == 0 and (model.sources | length) == 1 %} + {% set dependency_type = 'source' %} + {% else %} + {% set msg %} + Expected exactly one ref or exactly one source, instead got {{ model.refs | length }} models and {{ model.sources | length }} sources. + {% endset %} + {% do exceptions.raise_compiler_error(msg) %} + {% endif %} + + {% if dependency_type == 'ref' %} + {% set src = ref(*model.refs[0]) %} + {% else %} + {% set src = source(*model.sources[0]) %} + {% endif %} + + {%- set result_str = adapter.copy_table( + src, + destination, + config.get('copy_materialization', 'table')) -%} + + {{ store_result('main', status=result_str) }} + + {# Clean up #} + {{ run_hooks(post_hooks) }} + {{ adapter.commit() }} + + {{ return({'relations': [destination]}) }} +{%- endmaterialization %} diff --git a/test/integration/022_bigquery_test/copy-failing-models/copy_bad_materialization.sql b/test/integration/022_bigquery_test/copy-failing-models/copy_bad_materialization.sql new file mode 100644 index 00000000000..b6093645d56 --- /dev/null +++ b/test/integration/022_bigquery_test/copy-failing-models/copy_bad_materialization.sql @@ -0,0 +1,2 @@ +{{ config(copy_materialization='view') }} +{{ ref('original') }} \ No newline at end of file diff --git a/test/integration/022_bigquery_test/copy-failing-models/original.sql b/test/integration/022_bigquery_test/copy-failing-models/original.sql new file mode 100644 index 00000000000..26d9cae7b5b --- /dev/null +++ b/test/integration/022_bigquery_test/copy-failing-models/original.sql @@ -0,0 +1 @@ +select 1 as id \ No newline at end of file diff --git a/test/integration/022_bigquery_test/copy-models/copy_as_incremental.sql b/test/integration/022_bigquery_test/copy-models/copy_as_incremental.sql new file mode 100644 index 00000000000..bbe8e5acd2e --- /dev/null +++ b/test/integration/022_bigquery_test/copy-models/copy_as_incremental.sql @@ -0,0 +1,2 @@ +{{ config(copy_materialization='incremental') }} +{{ ref('original') }} \ No newline at end of file diff --git a/test/integration/022_bigquery_test/copy-models/copy_as_table.sql b/test/integration/022_bigquery_test/copy-models/copy_as_table.sql new file mode 100644 index 00000000000..4dda068ac3a --- /dev/null +++ b/test/integration/022_bigquery_test/copy-models/copy_as_table.sql @@ -0,0 +1,2 @@ +{{ config(copy_materialization='table') }} +{{ ref('original') }} \ No newline at end of file diff --git a/test/integration/022_bigquery_test/copy-models/original.sql b/test/integration/022_bigquery_test/copy-models/original.sql new file mode 100644 index 00000000000..26d9cae7b5b --- /dev/null +++ b/test/integration/022_bigquery_test/copy-models/original.sql @@ -0,0 +1 @@ +select 1 as id \ No newline at end of file diff --git a/test/integration/022_bigquery_test/test_bigquery_copy_failing_models.py b/test/integration/022_bigquery_test/test_bigquery_copy_failing_models.py new file mode 100644 index 00000000000..a3cd8877200 --- /dev/null +++ b/test/integration/022_bigquery_test/test_bigquery_copy_failing_models.py @@ -0,0 +1,36 @@ +from test.integration.base import DBTIntegrationTest, use_profile +import textwrap +import yaml + + +class TestBigqueryCopyTableFails(DBTIntegrationTest): + + @property + def schema(self): + return "bigquery_test_022" + + @property + def models(self): + return "copy-failing-models" + + @property + def profile_config(self): + return self.bigquery_profile() + + @property + def project_config(self): + return yaml.safe_load(textwrap.dedent('''\ + config-version: 2 + models: + test: + original: + materialized: table + copy_bad_materialization: + materialized: copy + ''')) + + @use_profile('bigquery') + def test__bigquery_copy_table_fails(self): + results = self.run_dbt(expect_pass=False) + self.assertEqual(len(results), 2) + self.assertTrue(results[1].error) diff --git a/test/integration/022_bigquery_test/test_bigquery_copy_models.py b/test/integration/022_bigquery_test/test_bigquery_copy_models.py new file mode 100644 index 00000000000..00028357faf --- /dev/null +++ b/test/integration/022_bigquery_test/test_bigquery_copy_models.py @@ -0,0 +1,37 @@ +from test.integration.base import DBTIntegrationTest, use_profile +import textwrap +import yaml + + +class TestBigqueryCopyTable(DBTIntegrationTest): + + @property + def schema(self): + return "bigquery_test_022" + + @property + def models(self): + return "copy-models" + + @property + def profile_config(self): + return self.bigquery_profile() + + @property + def project_config(self): + return yaml.safe_load(textwrap.dedent('''\ + config-version: 2 + models: + test: + original: + materialized: table + copy_as_table: + materialized: copy + copy_as_incremental: + materialized: copy + ''')) + + @use_profile('bigquery') + def test__bigquery_copy_table(self): + results = self.run_dbt() + self.assertEqual(len(results), 3) diff --git a/test/unit/test_bigquery_adapter.py b/test/unit/test_bigquery_adapter.py index 837283b3107..dd1f1e1bc89 100644 --- a/test/unit/test_bigquery_adapter.py +++ b/test/unit/test_bigquery_adapter.py @@ -4,7 +4,7 @@ import unittest from contextlib import contextmanager from requests.exceptions import ConnectionError -from unittest.mock import patch, MagicMock, Mock, create_autospec +from unittest.mock import patch, MagicMock, Mock, create_autospec, ANY import hologram @@ -21,8 +21,9 @@ import dbt.exceptions from dbt.logger import GLOBAL_LOGGER as logger # noqa -from .utils import config_from_parts_or_dicts, inject_adapter, TestAdapterConversions +import google.cloud.bigquery +from .utils import config_from_parts_or_dicts, inject_adapter, TestAdapterConversions def _bq_conn(): conn = MagicMock() @@ -516,8 +517,65 @@ def test_query_and_results(self, mock_bq): self.mock_client.query.assert_called_once_with( 'sql', job_config=mock_bq.QueryJobConfig()) + + def test_copy_bq_table_appends(self): + self._copy_table( + write_disposition=dbt.adapters.bigquery.impl.WRITE_APPEND) + args, kwargs = self.mock_client.copy_table.call_args + self.mock_client.copy_table.assert_called_once_with( + self._table_ref('project', 'dataset', 'table1', None), + self._table_ref('project', 'dataset', 'table2', None), + job_config=ANY) + args, kwargs = self.mock_client.copy_table.call_args + self.assertEqual( + kwargs['job_config'].write_disposition, + dbt.adapters.bigquery.impl.WRITE_APPEND) + + def test_copy_bq_table_truncates(self): + self._copy_table( + write_disposition=dbt.adapters.bigquery.impl.WRITE_TRUNCATE) + args, kwargs = self.mock_client.copy_table.call_args + self.mock_client.copy_table.assert_called_once_with( + self._table_ref('project', 'dataset', 'table1', None), + self._table_ref('project', 'dataset', 'table2', None), + job_config=ANY) + args, kwargs = self.mock_client.copy_table.call_args + self.assertEqual( + kwargs['job_config'].write_disposition, + dbt.adapters.bigquery.impl.WRITE_TRUNCATE) + + def _table_ref(self, proj, ds, table, conn): + return google.cloud.bigquery.table.TableReference.from_string( + '{}.{}.{}'.format(proj, ds, table)) + + def _copy_table(self, write_disposition): + + self.connections.table_ref = self._table_ref + source = BigQueryRelation.create( + database='project', schema='dataset', identifier='table1') + destination = BigQueryRelation.create( + database='project', schema='dataset', identifier='table2') + self.connections.copy_bq_table(source, destination, write_disposition) + + +class TestBigQueryAdapter(BaseTestBigQueryAdapter): + + def test_copy_table_materialization_table(self): + adapter = self.get_adapter('oauth') + adapter.connections = MagicMock() + adapter.copy_table('source', 'destination', 'table') + adapter.connections.copy_bq_table.assert_called_once_with( + 'source', 'destination', + dbt.adapters.bigquery.impl.WRITE_TRUNCATE) + + def test_copy_table_materialization_incremental(self): + adapter = self.get_adapter('oauth') + adapter.connections = MagicMock() + adapter.copy_table('source', 'destination', 'incremental') + adapter.connections.copy_bq_table.assert_called_once_with( + 'source', 'destination', + dbt.adapters.bigquery.impl.WRITE_APPEND) -class TestBigQueryTableOptions(BaseTestBigQueryAdapter): def test_parse_partition_by(self): adapter = self.get_adapter('oauth')