Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a BigQuery adapter macro to enable usage of CopyJobs #2709

Merged
merged 25 commits into from
Aug 19, 2020
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
- Add deprecation warnings to anonymous usage tracking ([#2688](https://github.com/fishtown-analytics/dbt/issues/2688), [#2710](https://github.com/fishtown-analytics/dbt/issues/2710))

### Features
- Add a BigQuery adapter macro to enable usage of CopyJobs ([#2709](https://github.com/fishtown-analytics/dbt/pull/2709))
- Add better retry support when using the BigQuery adapter ([#2694](https://github.com/fishtown-analytics/dbt/pull/2694), follow-up to [#1963](https://github.com/fishtown-analytics/dbt/pull/1963))
- Added a `dispatch` method to the context adapter and deprecated `adapter_macro`. ([#2302](https://github.com/fishtown-analytics/dbt/issues/2302), [#2679](https://github.com/fishtown-analytics/dbt/pull/2679))
- The built-in schema tests now use `adapter.dispatch`, so they can be overridden for adapter plugins ([#2415](https://github.com/fishtown-analytics/dbt/issues/2415), [#2684](https://github.com/fishtown-analytics/dbt/pull/2684))
Expand All @@ -28,7 +29,7 @@

Contributors:
- [@bbhoss](https://github.com/bbhoss) ([#2677](https://github.com/fishtown-analytics/dbt/pull/2677))
- [@kconvey](https://github.com/kconvey) ([#2694](https://github.com/fishtown-analytics/dbt/pull/2694))
- [@kconvey](https://github.com/kconvey) ([#2694](https://github.com/fishtown-analytics/dbt/pull/2694), [#2709](https://github.com/fishtown-analytics/dbt/pull/2709))
- [@vogt4nick](https://github.com/vogt4nick) ([#2702](https://github.com/fishtown-analytics/dbt/issues/2702))
- [@stephen8chang](https://github.com/stephen8chang) ([docs#106](https://github.com/fishtown-analytics/dbt-docs/pull/106), [docs#108](https://github.com/fishtown-analytics/dbt-docs/pull/108), [docs#113](https://github.com/fishtown-analytics/dbt-docs/pull/113))

Expand Down
30 changes: 29 additions & 1 deletion plugins/bigquery/dbt/adapters/bigquery/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@

BQ_QUERY_JOB_SPLIT = '-----Query Job SQL Follows-----'

WRITE_TRUNCATE = google.cloud.bigquery.job.WriteDisposition.WRITE_TRUNCATE

REOPENABLE_ERRORS = (
ConnectionResetError,
ConnectionError,
Expand Down Expand Up @@ -336,7 +338,7 @@ def create_table(self, database, schema, table_name, sql):

table_ref = self.table_ref(database, schema, table_name, conn)
job_params = {'destination': table_ref,
'write_disposition': 'WRITE_TRUNCATE'}
'write_disposition': WRITE_TRUNCATE}

timeout = self.get_timeout(conn)

Expand All @@ -352,6 +354,32 @@ def callback(table):
self.create_bigquery_table(database, schema, table_name, callback,
'CREATE DAY PARTITIONED TABLE')

def copy_bq_table(self, source, destination, write_disposition):
conn = self.get_thread_connection()
client = conn.handle

source_ref = self.table_ref(
source.database, source.schema, source.table, conn)
destination_ref = self.table_ref(
destination.database, destination.schema, destination.table, conn)

logger.debug(
'Copying table "{}" to "{}" with disposition: "{}"',
source_ref.path, destination_ref.path, write_disposition)

def copy_and_results():
job_config = google.cloud.bigquery.CopyJobConfig(
write_disposition=write_disposition)
copy_job = client.copy_table(
source_ref, destination_ref, job_config=job_config)
iterator = copy_job.result(timeout=self.get_timeout(conn))
return copy_job, iterator

self._retry_and_handle(
msg='copy table "{}" to "{}"'.format(
source_ref.path, destination_ref.path),
conn=conn, fn=copy_and_results)

@staticmethod
def dataset(database, schema, conn):
dataset_ref = conn.handle.dataset(schema, database)
Expand Down
21 changes: 21 additions & 0 deletions plugins/bigquery/dbt/adapters/bigquery/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@
import agate
import json

# Write dispositions for bigquery.
WRITE_APPEND = google.cloud.bigquery.job.WriteDisposition.WRITE_APPEND
WRITE_TRUNCATE = google.cloud.bigquery.job.WriteDisposition.WRITE_TRUNCATE


def sql_escape(string):
if not isinstance(string, str):
Expand Down Expand Up @@ -416,6 +420,23 @@ def _materialize_as_table(

return "CREATE TABLE"

@available.parse(lambda *a, **k: '')
def copy_table(self, source, destination, materialization):
if materialization == 'incremental':
write_disposition = WRITE_APPEND
elif materialization == 'table':
write_disposition = WRITE_TRUNCATE
else:
dbt.exceptions.raise_compiler_error(
'Copy table materialization must be "copy" or "table", but '
f"config.get('copy_materialization', 'table') was "
f'{materialization}')

self.connections.copy_bq_table(
source, destination, write_disposition)

return "COPY TABLE with materialization: {}".format(materialization)

@classmethod
def poll_until_job_completes(cls, job, timeout):
retry_count = timeout
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
{% materialization copy, adapter='bigquery' -%}

{# Setup #}
{{ run_hooks(pre_hooks) }}

{# there should be exactly one ref or exactly one source #}
{% set destination = this.incorporate(type='table') %}

{% set dependency_type = none %}
{% if (model.refs | length) == 1 and (model.sources | length) == 0 %}
{% set dependency_type = 'ref' %}
{% elif (model.refs | length) == 0 and (model.sources | length) == 1 %}
{% set dependency_type = 'source' %}
{% else %}
{% set msg %}
Expected exactly one ref or exactly one source, instead got {{ model.refs | length }} models and {{ model.sources | length }} sources.
{% endset %}
{% do exceptions.raise_compiler_error(msg) %}
{% endif %}

{% if dependency_type == 'ref' %}
{% set src = ref(*model.refs[0]) %}
{% else %}
{% set src = source(*model.sources[0]) %}
{% endif %}

{%- set result_str = adapter.copy_table(
src,
destination,
config.get('copy_materialization', 'table')) -%}

{{ store_result('main', status=result_str) }}

{# Clean up #}
{{ run_hooks(post_hooks) }}
{{ adapter.commit() }}

{{ return({'relations': [destination]}) }}
{%- endmaterialization %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{{ config(copy_materialization='view') }}
{{ ref('original') }}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
select 1 as id
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{{ config(copy_materialization='incremental') }}
{{ ref('original') }}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{{ config(copy_materialization='table') }}
{{ ref('original') }}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
select 1 as id
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from test.integration.base import DBTIntegrationTest, use_profile
import textwrap
import yaml


class TestBigqueryCopyTableFails(DBTIntegrationTest):

@property
def schema(self):
return "bigquery_test_022"

@property
def models(self):
return "copy-failing-models"

@property
def profile_config(self):
return self.bigquery_profile()

@property
def project_config(self):
return yaml.safe_load(textwrap.dedent('''\
config-version: 2
models:
test:
original:
materialized: table
copy_bad_materialization:
materialized: copy
'''))

@use_profile('bigquery')
def test__bigquery_copy_table_fails(self):
results = self.run_dbt(expect_pass=False)
self.assertEqual(len(results), 2)
self.assertTrue(results[1].error)
37 changes: 37 additions & 0 deletions test/integration/022_bigquery_test/test_bigquery_copy_models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
from test.integration.base import DBTIntegrationTest, use_profile
import textwrap
import yaml


class TestBigqueryCopyTable(DBTIntegrationTest):

@property
def schema(self):
return "bigquery_test_022"

@property
def models(self):
return "copy-models"

@property
def profile_config(self):
return self.bigquery_profile()

@property
def project_config(self):
return yaml.safe_load(textwrap.dedent('''\
config-version: 2
models:
test:
original:
materialized: table
copy_as_table:
materialized: copy
copy_as_incremental:
materialized: copy
'''))

@use_profile('bigquery')
def test__bigquery_copy_table(self):
results = self.run_dbt()
self.assertEqual(len(results), 3)
64 changes: 61 additions & 3 deletions test/unit/test_bigquery_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import unittest
from contextlib import contextmanager
from requests.exceptions import ConnectionError
from unittest.mock import patch, MagicMock, Mock
from unittest.mock import patch, MagicMock, Mock, ANY

import hologram

Expand All @@ -21,8 +21,9 @@
import dbt.exceptions
from dbt.logger import GLOBAL_LOGGER as logger # noqa

from .utils import config_from_parts_or_dicts, inject_adapter, TestAdapterConversions
import google.cloud.bigquery

from .utils import config_from_parts_or_dicts, inject_adapter, TestAdapterConversions

def _bq_conn():
conn = MagicMock()
Expand Down Expand Up @@ -516,8 +517,65 @@ def test_query_and_results(self, mock_bq):
self.mock_client.query.assert_called_once_with(
'sql', job_config=mock_bq.QueryJobConfig())


def test_copy_bq_table_appends(self):
self._copy_table(
write_disposition=dbt.adapters.bigquery.impl.WRITE_APPEND)
args, kwargs = self.mock_client.copy_table.call_args
self.mock_client.copy_table.assert_called_once_with(
self._table_ref('project', 'dataset', 'table1', None),
self._table_ref('project', 'dataset', 'table2', None),
job_config=ANY)
args, kwargs = self.mock_client.copy_table.call_args
self.assertEqual(
kwargs['job_config'].write_disposition,
dbt.adapters.bigquery.impl.WRITE_APPEND)

def test_copy_bq_table_truncates(self):
self._copy_table(
write_disposition=dbt.adapters.bigquery.impl.WRITE_TRUNCATE)
args, kwargs = self.mock_client.copy_table.call_args
self.mock_client.copy_table.assert_called_once_with(
self._table_ref('project', 'dataset', 'table1', None),
self._table_ref('project', 'dataset', 'table2', None),
job_config=ANY)
args, kwargs = self.mock_client.copy_table.call_args
self.assertEqual(
kwargs['job_config'].write_disposition,
dbt.adapters.bigquery.impl.WRITE_TRUNCATE)

def _table_ref(self, proj, ds, table, conn):
return google.cloud.bigquery.table.TableReference.from_string(
'{}.{}.{}'.format(proj, ds, table))

def _copy_table(self, write_disposition):

self.connections.table_ref = self._table_ref
source = BigQueryRelation.create(
database='project', schema='dataset', identifier='table1')
destination = BigQueryRelation.create(
database='project', schema='dataset', identifier='table2')
self.connections.copy_bq_table(source, destination, write_disposition)


class TestBigQueryAdapter(BaseTestBigQueryAdapter):

def test_copy_table_materialization_table(self):
adapter = self.get_adapter('oauth')
adapter.connections = MagicMock()
adapter.copy_table('source', 'destination', 'table')
adapter.connections.copy_bq_table.assert_called_once_with(
'source', 'destination',
dbt.adapters.bigquery.impl.WRITE_TRUNCATE)

def test_copy_table_materialization_incremental(self):
adapter = self.get_adapter('oauth')
adapter.connections = MagicMock()
adapter.copy_table('source', 'destination', 'incremental')
adapter.connections.copy_bq_table.assert_called_once_with(
'source', 'destination',
dbt.adapters.bigquery.impl.WRITE_APPEND)

class TestBigQueryTableOptions(BaseTestBigQueryAdapter):
def test_parse_partition_by(self):
adapter = self.get_adapter('oauth')

Expand Down