Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add to_bqstorage to convert from Table[Reference] google-cloud-bigquery-storage reference #6840

Merged
merged 4 commits into from
Dec 5, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 9 additions & 10 deletions bigquery/docs/snippets.py
Original file line number Diff line number Diff line change
Expand Up @@ -1314,7 +1314,7 @@ def test_load_table_from_file(client, to_delete):


def test_load_table_from_uri_avro(client, to_delete, capsys):
dataset_id = 'load_table_from_uri_avro_{}'.format(_millis())
dataset_id = "load_table_from_uri_avro_{}".format(_millis())
dataset = bigquery.Dataset(client.dataset(dataset_id))
client.create_dataset(dataset)
to_delete.append(dataset)
Expand All @@ -1327,23 +1327,22 @@ def test_load_table_from_uri_avro(client, to_delete, capsys):
dataset_ref = client.dataset(dataset_id)
job_config = bigquery.LoadJobConfig()
job_config.source_format = bigquery.SourceFormat.AVRO
uri = 'gs://cloud-samples-data/bigquery/us-states/us-states.avro'
uri = "gs://cloud-samples-data/bigquery/us-states/us-states.avro"

load_job = client.load_table_from_uri(
uri,
dataset_ref.table('us_states'),
job_config=job_config) # API request
print('Starting job {}'.format(load_job.job_id))
uri, dataset_ref.table("us_states"), job_config=job_config
) # API request
print("Starting job {}".format(load_job.job_id))

load_job.result() # Waits for table load to complete.
print('Job finished.')
print("Job finished.")

destination_table = client.get_table(dataset_ref.table('us_states'))
print('Loaded {} rows.'.format(destination_table.num_rows))
destination_table = client.get_table(dataset_ref.table("us_states"))
print("Loaded {} rows.".format(destination_table.num_rows))
# [END bigquery_load_table_gcs_avro]

out, _ = capsys.readouterr()
assert 'Loaded 50 rows.' in out
assert "Loaded 50 rows." in out


def test_load_table_from_uri_csv(client, to_delete, capsys):
Expand Down
76 changes: 76 additions & 0 deletions bigquery/google/cloud/bigquery/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,38 @@ def to_api_repr(self):
"tableId": self._table_id,
}

def to_bqstorage(self):
"""Construct a BigQuery Storage API representation of this table.

If the ``table_id`` contains a partition identifier (e.g.
``my_table$201812``) or a snapshot identifier (e.g.
``mytable@1234567890``), it is ignored. Use
:class:`google.cloud.bigquery_storage_v1beta1.types.TableReadOptions`
to filter rows by partition. Use
:class:`google.cloud.bigquery_storage_v1beta1.types.TableModifiers`
to select a specific snapshot to read from.

Returns:
google.cloud.bigquery_storage_v1beta1.types.TableReference:
A reference to this table in the BigQuery Storage API.
"""
from google.cloud import bigquery_storage_v1beta1

table_ref = bigquery_storage_v1beta1.types.TableReference()
table_ref.project_id = self._project
table_ref.dataset_id = self._dataset_id
table_id = self._table_id

if "@" in table_id:
tswast marked this conversation as resolved.
Show resolved Hide resolved
table_id = table_id.split("@")[0]

if "$" in table_id:
tswast marked this conversation as resolved.
Show resolved Hide resolved
table_id = table_id.split("$")[0]

table_ref.table_id = table_id

return table_ref

def _key(self):
"""A tuple key that uniquely describes this field.

Expand Down Expand Up @@ -820,6 +852,15 @@ def to_api_repr(self):
"""
return copy.deepcopy(self._properties)

def to_bqstorage(self):
"""Construct a BigQuery Storage API representation of this table.

Returns:
google.cloud.bigquery_storage_v1beta1.types.TableReference:
A reference to this table in the BigQuery Storage API.
"""
return self.reference.to_bqstorage()

def _build_resource(self, filter_fields):
"""Generate a resource for ``update``."""
partial = {}
Expand Down Expand Up @@ -971,6 +1012,41 @@ def friendly_name(self):

view_use_legacy_sql = property(_view_use_legacy_sql_getter)

@classmethod
def from_string(cls, full_table_id):
"""Construct a table from fully-qualified table ID.

Args:
full_table_id (str):
A fully-qualified table ID in standard SQL format. Must
included a project ID, dataset ID, and table ID, each
separated by ``.``.

Returns:
Table: Table parsed from ``full_table_id``.

Examples:
>>> Table.from_string('my-project.mydataset.mytable')
Table(TableRef...(D...('my-project', 'mydataset'), 'mytable'))

Raises:
ValueError:
If ``full_table_id`` is not a fully-qualified table ID in
standard SQL format.
"""
return cls(
{"tableReference": TableReference.from_string(full_table_id).to_api_repr()}
)

def to_bqstorage(self):
"""Construct a BigQuery Storage API representation of this table.

Returns:
google.cloud.bigquery_storage_v1beta1.types.TableReference:
A reference to this table in the BigQuery Storage API.
"""
return self.reference.to_bqstorage()


def _row_from_mapping(mapping, schema):
"""Convert a mapping to a row tuple using the schema.
Expand Down
6 changes: 3 additions & 3 deletions bigquery/noxfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@


LOCAL_DEPS = (
os.path.join('..', 'api_core'),
os.path.join('..', 'api_core[grpc]'),
os.path.join('..', 'core'),
)

Expand All @@ -40,9 +40,9 @@ def default(session):

# Pyarrow does not support Python 3.7
if session.python == '3.7':
dev_install = '.[pandas]'
dev_install = '.[bqstorage, pandas]'
else:
dev_install = '.[pandas, pyarrow]'
dev_install = '.[bqstorage, pandas, pyarrow]'
session.install('-e', dev_install)

# IPython does not support Python 2 after version 5.x
Expand Down
1 change: 1 addition & 0 deletions bigquery/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
'google-resumable-media >= 0.3.1',
]
extras = {
'bqstorage': 'google-cloud-bigquery-storage<=2.0.0dev',
'pandas': 'pandas>=0.17.1',
# Exclude PyArrow dependency from Windows Python 2.7.
'pyarrow: platform_system != "Windows" or python_version >= "3.4"':
Expand Down
2 changes: 1 addition & 1 deletion bigquery/tests/unit/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -800,7 +800,7 @@ def test_result_default_wo_state(self, result):
begin.assert_called_once_with(retry=DEFAULT_RETRY)
result.assert_called_once_with(timeout=None)

@mock.patch('google.api_core.future.polling.PollingFuture.result')
@mock.patch("google.api_core.future.polling.PollingFuture.result")
def test_result_w_retry_wo_state(self, result):
client = _make_client(project=self.PROJECT)
job = self._make_one(self.JOB_ID, client)
Expand Down
31 changes: 31 additions & 0 deletions bigquery/tests/unit/test_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,17 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import itertools
import unittest

import mock
import pytest
import six

try:
from google.cloud import bigquery_storage_v1beta1
except ImportError: # pragma: NO COVER
bigquery_storage_v1beta1 = None
try:
import pandas
except (ImportError, AttributeError): # pragma: NO COVER
Expand Down Expand Up @@ -1688,3 +1694,28 @@ def test_set_expiration_w_none(self):
time_partitioning = self._make_one()
time_partitioning.expiration_ms = None
assert time_partitioning._properties["expirationMs"] is None


@pytest.mark.skipif(
bigquery_storage_v1beta1 is None, reason="Requires `google-cloud-bigquery-storage`"
)
def test_table_reference_to_bqstorage():
from google.cloud.bigquery import table as mut

# Can't use parametrized pytest because bigquery_storage_v1beta1 may not be
# available.
expected = bigquery_storage_v1beta1.types.TableReference(
project_id="my-project", dataset_id="my_dataset", table_id="my_table"
)
cases = (
"my-project.my_dataset.my_table",
"my-project.my_dataset.my_table$20181225",
"my-project.my_dataset.my_table@1234567890",
"my-project.my_dataset.my_table$20181225@1234567890",
)

classes = (mut.TableReference, mut.Table, mut.TableListItem)

for case, cls in itertools.product(cases, classes):
got = cls.from_string(case).to_bqstorage()
assert got == expected