Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a BigQuery adapter macro to enable usage of CopyJobs #2709

Merged
merged 25 commits into from
Aug 19, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 34 additions & 1 deletion plugins/bigquery/dbt/adapters/bigquery/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@

BQ_QUERY_JOB_SPLIT = '-----Query Job SQL Follows-----'

# Write dispositions for bigquery.
WRITE_APPEND = google.cloud.bigquery.job.WriteDisposition.WRITE_APPEND
WRITE_TRUNCATE = google.cloud.bigquery.job.WriteDisposition.WRITE_TRUNCATE

REOPENABLE_ERRORS = (
ConnectionResetError,
ConnectionError,
Expand Down Expand Up @@ -336,7 +340,7 @@ def create_table(self, database, schema, table_name, sql):

table_ref = self.table_ref(database, schema, table_name, conn)
job_params = {'destination': table_ref,
'write_disposition': 'WRITE_TRUNCATE'}
'write_disposition': WRITE_TRUNCATE}

timeout = self.get_timeout(conn)

Expand All @@ -352,6 +356,35 @@ def callback(table):
self.create_bigquery_table(database, schema, table_name, callback,
'CREATE DAY PARTITIONED TABLE')

def copy_bq_table(self, source, destination, materialization):
conn = self.get_thread_connection()
client = conn.handle

if materialization is 'incremental':
write_disposition = WRITE_APPEND
else:
write_disposition = WRITE_TRUNCATE
kconvey marked this conversation as resolved.
Show resolved Hide resolved

source = self.table_ref(
source.database, source.schema, source.table, conn)
destination = self.table_ref(
destination.database, destination.schema, destination.table, conn)
logger.debug(
'Copying table "{}" to "{}" with disposition: "{}"', source.path,
destination.path, write_disposition)

def copy_and_results():
job_config = google.cloud.bigquery.CopyJobConfig(
write_disposition=write_disposition)
copy_job = client.copy_table(
source, destination, job_config=job_config)
iterator = copy_job.result(timeout=self.get_timeout(conn))
return copy_job, iterator
self._retry_and_handle(
msg='copy table "{}" to "{}"'.format(
source.path, destination.path), conn=conn, fn=copy_and_results)


@staticmethod
def dataset(database, schema, conn):
dataset_ref = conn.handle.dataset(schema, database)
Expand Down
5 changes: 5 additions & 0 deletions plugins/bigquery/dbt/adapters/bigquery/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,11 @@ def _materialize_as_table(

return "CREATE TABLE"

@available.parse(lambda *a, **k: '')
def copy_table(self, source, destination, materialization):
self.connections.copy_bq_table(
source, destination, materialization)

@classmethod
def poll_until_job_completes(cls, job, timeout):
retry_count = timeout
Expand Down
31 changes: 31 additions & 0 deletions plugins/bigquery/dbt/include/bigquery/macros/copy_table.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
{# copy_table macro allows for a table copy job to be submitted to BigQuery, which should run
# faster than `select * from`. The macro takes in a BigQueryRelation (returned by a ref() or
# source()) defining the source table for the copy, which is forwarded to the adapter. If the
# materialization is set to table, creates a table or overwrites and existing one. If incremental,
# appends to the existing table. Other materializations are not supported, and throw an error.
#}
{% macro copy_table(source) %}

{%- if not execute: -%}
{%- set materialized_method = config.source_config['config'].get('materialized', '') -%}
kconvey marked this conversation as resolved.
Show resolved Hide resolved
{{ config(copy_materialization=materialized_method) }}
{%- if materialized_method not in ('table', 'incremental') -%}
{{
exceptions.raise_not_implemented(
'Copy must materialize as table or incremental, not %s' %
materialized_method)
}}
{%- endif -%}
{%- endif -%}

{{ config(materialized='copy') }}

{%- set destination = api.Relation.create(
database=database, schema=schema, identifier=model['alias'], type='table') -%}

{{
adapter.copy_table(
source,
destination,
config.get('copy_materialization'))
}}
kconvey marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{% materialization copy, adapter='bigquery' -%}

{# Setup #}
{{ run_hooks(pre_hooks) }}

{# execute the macro sql #}
{{ write(sql) }}
{{ store_result(name='main', status='COPY TABLE') }}

{# Clean up #}
{{ run_hooks(post_hooks) }}
{{ adapter.commit() }}

{ return({'relations': [context['destination']]}) }
{%- endmaterialization %}
kconvey marked this conversation as resolved.
Show resolved Hide resolved
52 changes: 50 additions & 2 deletions test/unit/test_bigquery_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import unittest
from contextlib import contextmanager
from requests.exceptions import ConnectionError
from unittest.mock import patch, MagicMock, Mock
from unittest.mock import patch, MagicMock, Mock, ANY

import hologram

Expand All @@ -21,6 +21,8 @@
import dbt.exceptions
from dbt.logger import GLOBAL_LOGGER as logger # noqa

import google.cloud.bigquery

from .utils import config_from_parts_or_dicts, inject_adapter, TestAdapterConversions


Expand Down Expand Up @@ -516,8 +518,54 @@ def test_query_and_results(self, mock_bq):
self.mock_client.query.assert_called_once_with(
'sql', job_config=mock_bq.QueryJobConfig())


def test_copy_bq_table_appends(self):
self._copy_table(materialization='incremental')
args, kwargs = self.mock_client.copy_table.call_args
self.mock_client.copy_table.assert_called_once_with(
self._table_ref('project', 'dataset', 'table1', None),
self._table_ref('project', 'dataset', 'table2', None),
job_config=ANY)
args, kwargs = self.mock_client.copy_table.call_args
self.assertEqual(
kwargs['job_config'].write_disposition,
dbt.adapters.bigquery.connections._WRITE_APPEND)

def test_copy_bq_table_truncates(self):
self._copy_table(materialization='table')
args, kwargs = self.mock_client.copy_table.call_args
self.mock_client.copy_table.assert_called_once_with(
self._table_ref('project', 'dataset', 'table1', None),
self._table_ref('project', 'dataset', 'table2', None),
job_config=ANY)
args, kwargs = self.mock_client.copy_table.call_args
self.assertEqual(
kwargs['job_config'].write_disposition,
dbt.adapters.bigquery.connections._WRITE_TRUNCATE)

def _table_ref(self, proj, ds, table, conn):
return google.cloud.bigquery.table.TableReference.from_string(
'{}.{}.{}'.format(proj, ds, table))

def _copy_table(self, materialization):

self.connections.table_ref = self._table_ref
source = BigQueryRelation.create(
database='project', schema='dataset', identifier='table1')
destination = BigQueryRelation.create(
database='project', schema='dataset', identifier='table2')
self.connections.copy_bq_table(source, destination, materialization)


class TestBigQueryAdapter(BaseTestBigQueryAdapter):

def test_copy_table(self):
adapter = self.get_adapter('oauth')
adapter.connections = MagicMock()
adapter.copy_table('source', 'destination', 'materialization')
adapter.connections.copy_bq_table.assert_called_once_with(
'source', 'destination', 'materialization')

class TestBigQueryTableOptions(BaseTestBigQueryAdapter):
def test_parse_partition_by(self):
adapter = self.get_adapter('oauth')

Expand Down