From 92e5119e9fa414f395974541a10f1fe30efecaf8 Mon Sep 17 00:00:00 2001 From: Tres Seaver Date: Fri, 2 Dec 2016 17:52:42 -0500 Subject: [PATCH] Add support for query parameters (#2776) * Add 'ScalarQueryParameter' class. Holds name, type, and value for scalar query parameters, and handles marshalling them to / from JSON representation mandated by the BigQuery API. * Factor out 'AbstractQueryParameter. * Add 'ArrayQueryParameter' class. Holds name, type, and value for array query parameters, and handles marshalling them to / from JSON representation mandated by the BigQuery API. * Add 'StructQueryParameter' class. Holds name, types, and values for Struct query parameters, and handles marshalling them to / from JSON representation mandated by the BigQuery API. * Add 'QueryParametersProperty' descriptor class. * Add 'query_parameters' property to 'QueryResults' and 'QueryJob'. * Plumb 'udf_resources'/'query_parameters' through client query factories. * Expose concrete query parameter classes as package APIs. Closes #2551. --- bigquery/google/cloud/bigquery/__init__.py | 3 + bigquery/google/cloud/bigquery/_helpers.py | 285 +++++++++++++++- bigquery/google/cloud/bigquery/client.py | 35 +- bigquery/google/cloud/bigquery/job.py | 30 +- bigquery/google/cloud/bigquery/query.py | 28 +- bigquery/unit_tests/test__helpers.py | 357 +++++++++++++++++++++ bigquery/unit_tests/test_client.py | 89 ++++- bigquery/unit_tests/test_job.py | 271 ++++++++++++---- bigquery/unit_tests/test_query.py | 118 ++++++- docs/bigquery-usage.rst | 6 + docs/bigquery_snippets.py | 34 +- 11 files changed, 1158 insertions(+), 98 deletions(-) diff --git a/bigquery/google/cloud/bigquery/__init__.py b/bigquery/google/cloud/bigquery/__init__.py index cde9432d83e2..9b5809af6cf7 100644 --- a/bigquery/google/cloud/bigquery/__init__.py +++ b/bigquery/google/cloud/bigquery/__init__.py @@ -23,6 +23,9 @@ """ +from google.cloud.bigquery._helpers import ArrayQueryParameter +from google.cloud.bigquery._helpers import ScalarQueryParameter +from google.cloud.bigquery._helpers import StructQueryParameter from google.cloud.bigquery.client import Client from google.cloud.bigquery.dataset import AccessGrant from google.cloud.bigquery.dataset import Dataset diff --git a/bigquery/google/cloud/bigquery/_helpers.py b/bigquery/google/cloud/bigquery/_helpers.py index f68018e706cc..202a39ac8447 100644 --- a/bigquery/google/cloud/bigquery/_helpers.py +++ b/bigquery/google/cloud/bigquery/_helpers.py @@ -14,6 +14,8 @@ """Shared helper functions for BigQuery API classes.""" +from collections import OrderedDict + from google.cloud._helpers import _datetime_from_microseconds from google.cloud._helpers import _date_from_iso8601_date @@ -230,16 +232,279 @@ def __set__(self, instance, value): instance._udf_resources = tuple(value) -def _build_udf_resources(resources): +class AbstractQueryParameter(object): + """Base class for named / positional query parameters. """ - :type resources: sequence of :class:`UDFResource` - :param resources: fields to be appended. + @classmethod + def from_api_repr(cls, resource): + """Factory: construct paramter from JSON resource. + + :type resource: dict + :param resource: JSON mapping of parameter + + :rtype: :class:`ScalarQueryParameter` + """ + raise NotImplementedError + + def to_api_repr(self): + """Construct JSON API representation for the parameter. + + :rtype: dict + """ + raise NotImplementedError + - :rtype: mapping - :returns: a mapping describing userDefinedFunctionResources for the query. +class ScalarQueryParameter(AbstractQueryParameter): + """Named / positional query parameters for scalar values. + + :type name: str or None + :param name: Parameter name, used via ``@foo`` syntax. If None, the + paramter can only be addressed via position (``?``). + + :type type_: str + :param type_: name of parameter type. One of `'STRING'`, `'INT64'`, + `'FLOAT64'`, `'BOOL'`, `'TIMESTAMP'`, or `'DATE'`. + + :type value: str, int, float, bool, :class:`datetime.datetime`, or + :class:`datetime.date`. + :param value: the scalar parameter value. """ - udfs = [] - for resource in resources: - udf = {resource.udf_type: resource.value} - udfs.append(udf) - return udfs + def __init__(self, name, type_, value): + self.name = name + self.type_ = type_ + self.value = value + + @classmethod + def positional(cls, type_, value): + """Factory for positional paramters. + + :type type_: str + :param type_: name of paramter type. One of `'STRING'`, `'INT64'`, + `'FLOAT64'`, `'BOOL'`, `'TIMESTAMP'`, or `'DATE'`. + + :type value: str, int, float, bool, :class:`datetime.datetime`, or + :class:`datetime.date`. + :param value: the scalar parameter value. + + :rtype: :class:`ScalarQueryParameter` + :returns: instance without name + """ + return cls(None, type_, value) + + @classmethod + def from_api_repr(cls, resource): + """Factory: construct paramter from JSON resource. + + :type resource: dict + :param resource: JSON mapping of parameter + + :rtype: :class:`ScalarQueryParameter` + :returns: instance + """ + name = resource.get('name') + type_ = resource['parameterType']['type'] + value = resource['parameterValue']['value'] + converted = _CELLDATA_FROM_JSON[type_](value, None) + return cls(name, type_, converted) + + def to_api_repr(self): + """Construct JSON API representation for the parameter. + + :rtype: dict + :returns: JSON mapping + """ + resource = { + 'parameterType': { + 'type': self.type_, + }, + 'parameterValue': { + 'value': self.value, + }, + } + if self.name is not None: + resource['name'] = self.name + return resource + + +class ArrayQueryParameter(AbstractQueryParameter): + """Named / positional query parameters for array values. + + :type name: str or None + :param name: Parameter name, used via ``@foo`` syntax. If None, the + paramter can only be addressed via position (``?``). + + :type array_type: str + :param array_type: + name of type of array elements. One of `'STRING'`, `'INT64'`, + `'FLOAT64'`, `'BOOL'`, `'TIMESTAMP'`, or `'DATE'`. + + :type values: list of appropriate scalar type. + :param values: the parameter array values. + """ + def __init__(self, name, array_type, values): + self.name = name + self.array_type = array_type + self.values = values + + @classmethod + def positional(cls, array_type, values): + """Factory for positional paramters. + + :type array_type: str + :param array_type: + name of type of array elements. One of `'STRING'`, `'INT64'`, + `'FLOAT64'`, `'BOOL'`, `'TIMESTAMP'`, or `'DATE'`. + + :type values: list of appropriate scalar type + :param values: the parameter array values. + + :rtype: :class:`ArrayQueryParameter` + :returns: instance without name + """ + return cls(None, array_type, values) + + @classmethod + def from_api_repr(cls, resource): + """Factory: construct paramter from JSON resource. + + :type resource: dict + :param resource: JSON mapping of parameter + + :rtype: :class:`ArrayQueryParameter` + :returns: instance + """ + name = resource.get('name') + array_type = resource['parameterType']['arrayType'] + values = resource['parameterValue']['arrayValues'] + converted = [ + _CELLDATA_FROM_JSON[array_type](value, None) for value in values] + return cls(name, array_type, converted) + + def to_api_repr(self): + """Construct JSON API representation for the parameter. + + :rtype: dict + :returns: JSON mapping + """ + resource = { + 'parameterType': { + 'arrayType': self.array_type, + }, + 'parameterValue': { + 'arrayValues': self.values, + }, + } + if self.name is not None: + resource['name'] = self.name + return resource + + +class StructQueryParameter(AbstractQueryParameter): + """Named / positional query parameters for struct values. + + :type name: str or None + :param name: Parameter name, used via ``@foo`` syntax. If None, the + paramter can only be addressed via position (``?``). + + :type sub_params: tuple of :class:`ScalarQueryParameter` + :param sub_params: the sub-parameters for the struct + """ + def __init__(self, name, *sub_params): + self.name = name + self.struct_types = OrderedDict( + (sub.name, sub.type_) for sub in sub_params) + self.struct_values = {sub.name: sub.value for sub in sub_params} + + @classmethod + def positional(cls, *sub_params): + """Factory for positional paramters. + + :type sub_params: tuple of :class:`ScalarQueryParameter` + :param sub_params: the sub-parameters for the struct + + :rtype: :class:`StructQueryParameter` + :returns: instance without name + """ + return cls(None, *sub_params) + + @classmethod + def from_api_repr(cls, resource): + """Factory: construct paramter from JSON resource. + + :type resource: dict + :param resource: JSON mapping of parameter + + :rtype: :class:`StructQueryParameter` + :returns: instance + """ + name = resource.get('name') + instance = cls(name) + types = instance.struct_types + for item in resource['parameterType']['structTypes']: + types[item['name']] = item['type'] + struct_values = resource['parameterValue']['structValues'] + for key, value in struct_values.items(): + converted = _CELLDATA_FROM_JSON[types[key]](value, None) + instance.struct_values[key] = converted + return instance + + def to_api_repr(self): + """Construct JSON API representation for the parameter. + + :rtype: dict + :returns: JSON mapping + """ + types = [ + {'name': key, 'type': value} + for key, value in self.struct_types.items() + ] + resource = { + 'parameterType': { + 'structTypes': types, + }, + 'parameterValue': { + 'structValues': self.struct_values, + }, + } + if self.name is not None: + resource['name'] = self.name + return resource + + +class QueryParametersProperty(object): + """Custom property type, holding query parameter instances.""" + + def __get__(self, instance, owner): + """Descriptor protocol: accessor + + :type instance: :class:`QueryParametersProperty` + :param instance: instance owning the property (None if accessed via + the class). + + :type owner: type + :param owner: the class owning the property. + + :rtype: list of instances of classes derived from + :class:`AbstractQueryParameter`. + :returns: the descriptor, if accessed via the class, or the instance's + query paramters. + """ + if instance is None: + return self + return list(instance._query_parameters) + + def __set__(self, instance, value): + """Descriptor protocol: mutator + + :type instance: :class:`QueryParametersProperty` + :param instance: instance owning the property (None if accessed via + the class). + + :type value: list of instances of classes derived from + :class:`AbstractQueryParameter`. + :param value: new query parameters for the instance. + """ + if not all(isinstance(u, AbstractQueryParameter) for u in value): + raise ValueError( + "query parameters must be derived from AbstractQueryParameter") + instance._query_parameters = tuple(value) diff --git a/bigquery/google/cloud/bigquery/client.py b/bigquery/google/cloud/bigquery/client.py index d16a9d9349d2..4af5b8fc4910 100644 --- a/bigquery/google/cloud/bigquery/client.py +++ b/bigquery/google/cloud/bigquery/client.py @@ -275,7 +275,8 @@ def extract_table_to_storage(self, job_name, source, *destination_uris): return ExtractTableToStorageJob(job_name, source, destination_uris, client=self) - def run_async_query(self, job_name, query): + def run_async_query(self, job_name, query, + udf_resources=(), query_parameters=()): """Construct a job for running a SQL query asynchronously. See: @@ -287,21 +288,47 @@ def run_async_query(self, job_name, query): :type query: str :param query: SQL query to be executed + :type udf_resources: tuple + :param udf_resources: An iterable of + :class:`google.cloud.bigquery._helpers.UDFResource` + (empty by default) + + :type query_parameters: tuple + :param query_parameters: + An iterable of + :class:`google.cloud.bigquery._helpers.AbstractQueryParameter` + (empty by default) + :rtype: :class:`google.cloud.bigquery.job.QueryJob` :returns: a new ``QueryJob`` instance """ - return QueryJob(job_name, query, client=self) + return QueryJob(job_name, query, client=self, + udf_resources=udf_resources, + query_parameters=query_parameters) - def run_sync_query(self, query): + def run_sync_query(self, query, udf_resources=(), query_parameters=()): """Run a SQL query synchronously. :type query: str :param query: SQL query to be executed + :type udf_resources: tuple + :param udf_resources: An iterable of + :class:`google.cloud.bigquery._helpers.UDFResource` + (empty by default) + + :type query_parameters: tuple + :param query_parameters: + An iterable of + :class:`google.cloud.bigquery._helpers.AbstractQueryParameter` + (empty by default) + :rtype: :class:`google.cloud.bigquery.query.QueryResults` :returns: a new ``QueryResults`` instance """ - return QueryResults(query, client=self) + return QueryResults(query, client=self, + udf_resources=udf_resources, + query_parameters=query_parameters) # pylint: disable=unused-argument diff --git a/bigquery/google/cloud/bigquery/job.py b/bigquery/google/cloud/bigquery/job.py index 203dd2df6dd0..5eff2b74ef90 100644 --- a/bigquery/google/cloud/bigquery/job.py +++ b/bigquery/google/cloud/bigquery/job.py @@ -23,10 +23,10 @@ from google.cloud.bigquery.table import Table from google.cloud.bigquery.table import _build_schema_resource from google.cloud.bigquery.table import _parse_schema_resource +from google.cloud.bigquery._helpers import QueryParametersProperty from google.cloud.bigquery._helpers import UDFResourcesProperty from google.cloud.bigquery._helpers import _EnumProperty from google.cloud.bigquery._helpers import _TypedProperty -from google.cloud.bigquery._helpers import _build_udf_resources class Compression(_EnumProperty): @@ -909,14 +909,23 @@ class QueryJob(_AsyncJob): :param udf_resources: An iterable of :class:`google.cloud.bigquery._helpers.UDFResource` (empty by default) + + :type query_parameters: tuple + :param query_parameters: + An iterable of + :class:`google.cloud.bigquery._helpers.AbstractQueryParameter` + (empty by default) """ _JOB_TYPE = 'query' _UDF_KEY = 'userDefinedFunctionResources' + _QUERY_PARAMETERS_KEY = 'queryParameters' - def __init__(self, name, query, client, udf_resources=()): + def __init__(self, name, query, client, + udf_resources=(), query_parameters=()): super(QueryJob, self).__init__(name, client) self.query = query self.udf_resources = udf_resources + self.query_parameters = query_parameters self._configuration = _AsyncQueryConfiguration() allow_large_results = _TypedProperty('allow_large_results', bool) @@ -949,6 +958,8 @@ def __init__(self, name, query, client, udf_resources=()): https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.query.priority """ + query_parameters = QueryParametersProperty() + udf_resources = UDFResourcesProperty() use_query_cache = _TypedProperty('use_query_cache', bool) @@ -1032,8 +1043,19 @@ def _populate_config_resource(self, configuration): if self.maximum_bytes_billed is not None: configuration['maximumBytesBilled'] = self.maximum_bytes_billed if len(self._udf_resources) > 0: - configuration[self._UDF_KEY] = _build_udf_resources( - self._udf_resources) + configuration[self._UDF_KEY] = [ + {udf_resource.udf_type: udf_resource.value} + for udf_resource in self._udf_resources + ] + if len(self._query_parameters) > 0: + configuration[self._QUERY_PARAMETERS_KEY] = [ + query_parameter.to_api_repr() + for query_parameter in self._query_parameters + ] + if self._query_parameters[0].name is None: + configuration['parameterMode'] = 'POSITIONAL' + else: + configuration['parameterMode'] = 'NAMED' def _build_resource(self): """Generate a resource for :meth:`begin`.""" diff --git a/bigquery/google/cloud/bigquery/query.py b/bigquery/google/cloud/bigquery/query.py index fa1b1da63883..95d2eabdbdbe 100644 --- a/bigquery/google/cloud/bigquery/query.py +++ b/bigquery/google/cloud/bigquery/query.py @@ -21,7 +21,7 @@ from google.cloud.bigquery.dataset import Dataset from google.cloud.bigquery.job import QueryJob from google.cloud.bigquery.table import _parse_schema_resource -from google.cloud.bigquery._helpers import _build_udf_resources +from google.cloud.bigquery._helpers import QueryParametersProperty from google.cloud.bigquery._helpers import UDFResourcesProperty @@ -53,16 +53,24 @@ class QueryResults(object): :param udf_resources: An iterable of :class:`google.cloud.bigquery.job.UDFResource` (empty by default) + + :type query_parameters: tuple + :param query_parameters: + An iterable of + :class:`google.cloud.bigquery._helpers.AbstractQueryParameter` + (empty by default) """ _UDF_KEY = 'userDefinedFunctionResources' + _QUERY_PARAMETERS_KEY = 'queryParameters' - def __init__(self, query, client, udf_resources=()): + def __init__(self, query, client, udf_resources=(), query_parameters=()): self._client = client self._properties = {} self.query = query self._configuration = _SyncQueryConfiguration() self.udf_resources = udf_resources + self.query_parameters = query_parameters self._job = None @classmethod @@ -258,6 +266,8 @@ def schema(self): https://cloud.google.com/bigquery/docs/reference/v2/jobs/query#preserveNulls """ + query_parameters = QueryParametersProperty() + timeout_ms = _TypedProperty('timeout_ms', six.integer_types) """See: https://cloud.google.com/bigquery/docs/reference/v2/jobs/query#timeoutMs @@ -314,7 +324,19 @@ def _build_resource(self): resource['dryRun'] = self.dry_run if len(self._udf_resources) > 0: - resource[self._UDF_KEY] = _build_udf_resources(self._udf_resources) + resource[self._UDF_KEY] = [ + {udf_resource.udf_type: udf_resource.value} + for udf_resource in self._udf_resources + ] + if len(self._query_parameters) > 0: + resource[self._QUERY_PARAMETERS_KEY] = [ + query_parameter.to_api_repr() + for query_parameter in self._query_parameters + ] + if self._query_parameters[0].name is None: + resource['parameterMode'] = 'POSITIONAL' + else: + resource['parameterMode'] = 'NAMED' return resource diff --git a/bigquery/unit_tests/test__helpers.py b/bigquery/unit_tests/test__helpers.py index 14dd39f62e53..b133e95d45a7 100644 --- a/bigquery/unit_tests/test__helpers.py +++ b/bigquery/unit_tests/test__helpers.py @@ -586,6 +586,363 @@ def test_instance_setter_w_bad_udfs(self): self.assertEqual(instance.udf_resources, []) +class Test_AbstractQueryParameter(unittest.TestCase): + + @staticmethod + def _get_target_class(): + from google.cloud.bigquery._helpers import AbstractQueryParameter + return AbstractQueryParameter + + def _make_one(self, *args, **kw): + return self._get_target_class()(*args, **kw) + + def test_from_api_virtual(self): + klass = self._get_target_class() + with self.assertRaises(NotImplementedError): + klass.from_api_repr({}) + + def test_to_api_virtual(self): + param = self._make_one() + with self.assertRaises(NotImplementedError): + param.to_api_repr() + + +class Test_ScalarQueryParameter(unittest.TestCase): + + @staticmethod + def _get_target_class(): + from google.cloud.bigquery._helpers import ScalarQueryParameter + return ScalarQueryParameter + + def _make_one(self, *args, **kw): + return self._get_target_class()(*args, **kw) + + def test_ctor(self): + param = self._make_one(name='foo', type_='INT64', value=123) + self.assertEqual(param.name, 'foo') + self.assertEqual(param.type_, 'INT64') + self.assertEqual(param.value, 123) + + def test_positional(self): + klass = self._get_target_class() + param = klass.positional(type_='INT64', value=123) + self.assertEqual(param.name, None) + self.assertEqual(param.type_, 'INT64') + self.assertEqual(param.value, 123) + + def test_from_api_repr_w_name(self): + RESOURCE = { + 'name': 'foo', + 'parameterType': { + 'type': 'INT64', + }, + 'parameterValue': { + 'value': 123, + }, + } + klass = self._get_target_class() + param = klass.from_api_repr(RESOURCE) + self.assertEqual(param.name, 'foo') + self.assertEqual(param.type_, 'INT64') + self.assertEqual(param.value, 123) + + def test_from_api_repr_wo_name(self): + RESOURCE = { + 'parameterType': { + 'type': 'INT64', + }, + 'parameterValue': { + 'value': '123', + }, + } + klass = self._get_target_class() + param = klass.from_api_repr(RESOURCE) + self.assertEqual(param.name, None) + self.assertEqual(param.type_, 'INT64') + self.assertEqual(param.value, 123) + + def test_to_api_repr_w_name(self): + EXPECTED = { + 'name': 'foo', + 'parameterType': { + 'type': 'INT64', + }, + 'parameterValue': { + 'value': 123, + }, + } + param = self._make_one(name='foo', type_='INT64', value=123) + self.assertEqual(param.to_api_repr(), EXPECTED) + + def test_to_api_repr_wo_name(self): + EXPECTED = { + 'parameterType': { + 'type': 'INT64', + }, + 'parameterValue': { + 'value': 123, + }, + } + klass = self._get_target_class() + param = klass.positional(type_='INT64', value=123) + self.assertEqual(param.to_api_repr(), EXPECTED) + + +class Test_ArrayQueryParameter(unittest.TestCase): + + @staticmethod + def _get_target_class(): + from google.cloud.bigquery._helpers import ArrayQueryParameter + return ArrayQueryParameter + + def _make_one(self, *args, **kw): + return self._get_target_class()(*args, **kw) + + def test_ctor(self): + param = self._make_one(name='foo', array_type='INT64', values=[1, 2]) + self.assertEqual(param.name, 'foo') + self.assertEqual(param.array_type, 'INT64') + self.assertEqual(param.values, [1, 2]) + + def test_positional(self): + klass = self._get_target_class() + param = klass.positional(array_type='INT64', values=[1, 2]) + self.assertEqual(param.name, None) + self.assertEqual(param.array_type, 'INT64') + self.assertEqual(param.values, [1, 2]) + + def test_from_api_repr_w_name(self): + RESOURCE = { + 'name': 'foo', + 'parameterType': { + 'arrayType': 'INT64', + }, + 'parameterValue': { + 'arrayValues': ['1', '2'], + }, + } + klass = self._get_target_class() + param = klass.from_api_repr(RESOURCE) + self.assertEqual(param.name, 'foo') + self.assertEqual(param.array_type, 'INT64') + self.assertEqual(param.values, [1, 2]) + + def test_from_api_repr_wo_name(self): + RESOURCE = { + 'parameterType': { + 'arrayType': 'INT64', + }, + 'parameterValue': { + 'arrayValues': ['1', '2'], + }, + } + klass = self._get_target_class() + param = klass.from_api_repr(RESOURCE) + self.assertEqual(param.name, None) + self.assertEqual(param.array_type, 'INT64') + self.assertEqual(param.values, [1, 2]) + + def test_to_api_repr_w_name(self): + EXPECTED = { + 'name': 'foo', + 'parameterType': { + 'arrayType': 'INT64', + }, + 'parameterValue': { + 'arrayValues': [1, 2], + }, + } + param = self._make_one(name='foo', array_type='INT64', values=[1, 2]) + self.assertEqual(param.to_api_repr(), EXPECTED) + + def test_to_api_repr_wo_name(self): + EXPECTED = { + 'parameterType': { + 'arrayType': 'INT64', + }, + 'parameterValue': { + 'arrayValues': [1, 2], + }, + } + klass = self._get_target_class() + param = klass.positional(array_type='INT64', values=[1, 2]) + self.assertEqual(param.to_api_repr(), EXPECTED) + + +class Test_StructQueryParameter(unittest.TestCase): + + @staticmethod + def _get_target_class(): + from google.cloud.bigquery._helpers import StructQueryParameter + return StructQueryParameter + + def _make_one(self, *args, **kw): + return self._get_target_class()(*args, **kw) + + @staticmethod + def _make_subparam(name, type_, value): + from google.cloud.bigquery._helpers import ScalarQueryParameter + return ScalarQueryParameter(name, type_, value) + + def test_ctor(self): + sub_1 = self._make_subparam('bar', 'INT64', 123) + sub_2 = self._make_subparam('baz', 'STRING', 'abc') + param = self._make_one('foo', sub_1, sub_2) + self.assertEqual(param.name, 'foo') + self.assertEqual(param.struct_types, {'bar': 'INT64', 'baz': 'STRING'}) + self.assertEqual(param.struct_values, {'bar': 123, 'baz': 'abc'}) + + def test_positional(self): + sub_1 = self._make_subparam('bar', 'INT64', 123) + sub_2 = self._make_subparam('baz', 'STRING', 'abc') + klass = self._get_target_class() + param = klass.positional(sub_1, sub_2) + self.assertEqual(param.name, None) + self.assertEqual(param.struct_types, {'bar': 'INT64', 'baz': 'STRING'}) + self.assertEqual(param.struct_values, {'bar': 123, 'baz': 'abc'}) + + def test_from_api_repr_w_name(self): + RESOURCE = { + 'name': 'foo', + 'parameterType': { + 'structTypes': [ + {'name': 'bar', 'type': 'INT64'}, + {'name': 'baz', 'type': 'STRING'}, + ], + }, + 'parameterValue': { + 'structValues': {'bar': 123, 'baz': 'abc'}, + }, + } + klass = self._get_target_class() + param = klass.from_api_repr(RESOURCE) + self.assertEqual(param.name, 'foo') + self.assertEqual(param.struct_types, {'bar': 'INT64', 'baz': 'STRING'}) + self.assertEqual(param.struct_values, {'bar': 123, 'baz': 'abc'}) + + def test_from_api_repr_wo_name(self): + RESOURCE = { + 'parameterType': { + 'structTypes': [ + {'name': 'bar', 'type': 'INT64'}, + {'name': 'baz', 'type': 'STRING'}, + ], + }, + 'parameterValue': { + 'structValues': {'bar': 123, 'baz': 'abc'}, + }, + } + klass = self._get_target_class() + param = klass.from_api_repr(RESOURCE) + self.assertEqual(param.name, None) + self.assertEqual(param.struct_types, {'bar': 'INT64', 'baz': 'STRING'}) + self.assertEqual(param.struct_values, {'bar': 123, 'baz': 'abc'}) + + def test_to_api_repr_w_name(self): + EXPECTED = { + 'name': 'foo', + 'parameterType': { + 'structTypes': [ + {'name': 'bar', 'type': 'INT64'}, + {'name': 'baz', 'type': 'STRING'}, + ], + }, + 'parameterValue': { + 'structValues': {'bar': 123, 'baz': 'abc'}, + }, + } + sub_1 = self._make_subparam('bar', 'INT64', 123) + sub_2 = self._make_subparam('baz', 'STRING', 'abc') + param = self._make_one('foo', sub_1, sub_2) + self.assertEqual(param.to_api_repr(), EXPECTED) + + def test_to_api_repr_wo_name(self): + EXPECTED = { + 'parameterType': { + 'structTypes': [ + {'name': 'bar', 'type': 'INT64'}, + {'name': 'baz', 'type': 'STRING'}, + ], + }, + 'parameterValue': { + 'structValues': {'bar': 123, 'baz': 'abc'}, + }, + } + sub_1 = self._make_subparam('bar', 'INT64', 123) + sub_2 = self._make_subparam('baz', 'STRING', 'abc') + klass = self._get_target_class() + param = klass.positional(sub_1, sub_2) + self.assertEqual(param.to_api_repr(), EXPECTED) + + +class Test_QueryParametersProperty(unittest.TestCase): + + @staticmethod + def _get_target_class(): + from google.cloud.bigquery._helpers import QueryParametersProperty + return QueryParametersProperty + + def _make_one(self, *args, **kw): + return self._get_target_class()(*args, **kw) + + def _descriptor_and_klass(self): + descriptor = self._make_one() + + class _Test(object): + _query_parameters = () + query_parameters = descriptor + + return descriptor, _Test + + def test_class_getter(self): + descriptor, klass = self._descriptor_and_klass() + self.assertIs(klass.query_parameters, descriptor) + + def test_instance_getter_empty(self): + _, klass = self._descriptor_and_klass() + instance = klass() + self.assertEqual(instance.query_parameters, []) + + def test_instance_getter_w_non_empty_list(self): + from google.cloud.bigquery._helpers import ScalarQueryParameter + query_parameters = [ScalarQueryParameter("foo", 'INT64', 123)] + _, klass = self._descriptor_and_klass() + instance = klass() + instance._query_parameters = tuple(query_parameters) + + self.assertEqual(instance.query_parameters, query_parameters) + + def test_instance_setter_w_empty_list(self): + from google.cloud.bigquery._helpers import ScalarQueryParameter + query_parameters = [ScalarQueryParameter("foo", 'INT64', 123)] + _, klass = self._descriptor_and_klass() + instance = klass() + instance._query_parameters = query_parameters + + instance.query_parameters = [] + + self.assertEqual(instance.query_parameters, []) + + def test_instance_setter_w_valid_udf(self): + from google.cloud.bigquery._helpers import ScalarQueryParameter + query_parameters = [ScalarQueryParameter("foo", 'INT64', 123)] + _, klass = self._descriptor_and_klass() + instance = klass() + + instance.query_parameters = query_parameters + + self.assertEqual(instance.query_parameters, query_parameters) + + def test_instance_setter_w_bad_udfs(self): + _, klass = self._descriptor_and_klass() + instance = klass() + + with self.assertRaises(ValueError): + instance.query_parameters = ["foo"] + + self.assertEqual(instance.query_parameters, []) + + class _Field(object): def __init__(self, mode, name='unknown', field_type='UNKNOWN', fields=()): diff --git a/bigquery/unit_tests/test_client.py b/bigquery/unit_tests/test_client.py index 61ad81227aee..45f49b0f831e 100644 --- a/bigquery/unit_tests/test_client.py +++ b/bigquery/unit_tests/test_client.py @@ -470,7 +470,7 @@ def test_extract_table_to_storage(self): self.assertEqual(job.source, source) self.assertEqual(list(job.destination_uris), [DESTINATION]) - def test_run_async_query(self): + def test_run_async_query_defaults(self): from google.cloud.bigquery.job import QueryJob PROJECT = 'PROJECT' JOB = 'job_name' @@ -483,19 +483,96 @@ def test_run_async_query(self): self.assertIs(job._client, client) self.assertEqual(job.name, JOB) self.assertEqual(job.query, QUERY) + self.assertEqual(job.udf_resources, []) + self.assertEqual(job.query_parameters, []) - def test_run_sync_query(self): - from google.cloud.bigquery.query import QueryResults + def test_run_async_w_udf_resources(self): + from google.cloud.bigquery._helpers import UDFResource + from google.cloud.bigquery.job import QueryJob + RESOURCE_URI = 'gs://some-bucket/js/lib.js' + PROJECT = 'PROJECT' + JOB = 'job_name' + QUERY = 'select count(*) from persons' + creds = _Credentials() + http = object() + client = self._make_one(project=PROJECT, credentials=creds, http=http) + udf_resources = [UDFResource("resourceUri", RESOURCE_URI)] + job = client.run_async_query(JOB, QUERY, udf_resources=udf_resources) + self.assertIsInstance(job, QueryJob) + self.assertIs(job._client, client) + self.assertEqual(job.name, JOB) + self.assertEqual(job.query, QUERY) + self.assertEqual(job.udf_resources, udf_resources) + self.assertEqual(job.query_parameters, []) + + def test_run_async_w_query_parameters(self): + from google.cloud.bigquery._helpers import ScalarQueryParameter + from google.cloud.bigquery.job import QueryJob PROJECT = 'PROJECT' + JOB = 'job_name' QUERY = 'select count(*) from persons' creds = _Credentials() http = object() client = self._make_one(project=PROJECT, credentials=creds, http=http) - job = client.run_sync_query(QUERY) - self.assertIsInstance(job, QueryResults) + query_parameters = [ScalarQueryParameter('foo', 'INT64', 123)] + job = client.run_async_query(JOB, QUERY, + query_parameters=query_parameters) + self.assertIsInstance(job, QueryJob) self.assertIs(job._client, client) - self.assertIsNone(job.name) + self.assertEqual(job.name, JOB) self.assertEqual(job.query, QUERY) + self.assertEqual(job.udf_resources, []) + self.assertEqual(job.query_parameters, query_parameters) + + def test_run_sync_query_defaults(self): + from google.cloud.bigquery.query import QueryResults + PROJECT = 'PROJECT' + QUERY = 'select count(*) from persons' + creds = _Credentials() + http = object() + client = self._make_one(project=PROJECT, credentials=creds, http=http) + query = client.run_sync_query(QUERY) + self.assertIsInstance(query, QueryResults) + self.assertIs(query._client, client) + self.assertIsNone(query.name) + self.assertEqual(query.query, QUERY) + self.assertEqual(query.udf_resources, []) + self.assertEqual(query.query_parameters, []) + + def test_run_sync_query_w_udf_resources(self): + from google.cloud.bigquery._helpers import UDFResource + from google.cloud.bigquery.query import QueryResults + RESOURCE_URI = 'gs://some-bucket/js/lib.js' + PROJECT = 'PROJECT' + QUERY = 'select count(*) from persons' + creds = _Credentials() + http = object() + client = self._make_one(project=PROJECT, credentials=creds, http=http) + udf_resources = [UDFResource("resourceUri", RESOURCE_URI)] + query = client.run_sync_query(QUERY, udf_resources=udf_resources) + self.assertIsInstance(query, QueryResults) + self.assertIs(query._client, client) + self.assertIsNone(query.name) + self.assertEqual(query.query, QUERY) + self.assertEqual(query.udf_resources, udf_resources) + self.assertEqual(query.query_parameters, []) + + def test_run_sync_query_w_query_parameters(self): + from google.cloud.bigquery._helpers import ScalarQueryParameter + from google.cloud.bigquery.query import QueryResults + PROJECT = 'PROJECT' + QUERY = 'select count(*) from persons' + creds = _Credentials() + http = object() + client = self._make_one(project=PROJECT, credentials=creds, http=http) + query_parameters = [ScalarQueryParameter('foo', 'INT64', 123)] + query = client.run_sync_query(QUERY, query_parameters=query_parameters) + self.assertIsInstance(query, QueryResults) + self.assertIs(query._client, client) + self.assertIsNone(query.name) + self.assertEqual(query.query, QUERY) + self.assertEqual(query.udf_resources, []) + self.assertEqual(query.query_parameters, query_parameters) class _Credentials(object): diff --git a/bigquery/unit_tests/test_job.py b/bigquery/unit_tests/test_job.py index c73715262ba4..84ee25418491 100644 --- a/bigquery/unit_tests/test_job.py +++ b/bigquery/unit_tests/test_job.py @@ -426,7 +426,7 @@ def test_begin_w_already_running(self): job.begin() def test_begin_w_bound_client(self): - PATH = 'projects/%s/jobs' % self.PROJECT + PATH = '/projects/%s/jobs' % (self.PROJECT,) RESOURCE = self._makeResource() # Ensure None for missing server-set props del RESOURCE['statistics']['creationTime'] @@ -443,7 +443,7 @@ def test_begin_w_bound_client(self): self.assertEqual(len(conn._requested), 1) req = conn._requested[0] self.assertEqual(req['method'], 'POST') - self.assertEqual(req['path'], '/%s' % PATH) + self.assertEqual(req['path'], PATH) SENT = { 'jobReference': { 'projectId': self.PROJECT, @@ -465,7 +465,7 @@ def test_begin_w_bound_client(self): def test_begin_w_alternate_client(self): from google.cloud.bigquery.schema import SchemaField - PATH = 'projects/%s/jobs' % self.PROJECT + PATH = '/projects/%s/jobs' % (self.PROJECT,) RESOURCE = self._makeResource(ended=True) LOAD_CONFIGURATION = { 'sourceUris': [self.SOURCE1], @@ -519,7 +519,7 @@ def test_begin_w_alternate_client(self): self.assertEqual(len(conn2._requested), 1) req = conn2._requested[0] self.assertEqual(req['method'], 'POST') - self.assertEqual(req['path'], '/%s' % PATH) + self.assertEqual(req['path'], PATH) SENT = { 'jobReference': { 'projectId': self.PROJECT, @@ -533,7 +533,7 @@ def test_begin_w_alternate_client(self): self._verifyResourceProperties(job, RESOURCE) def test_exists_miss_w_bound_client(self): - PATH = 'projects/%s/jobs/%s' % (self.PROJECT, self.JOB_NAME) + PATH = '/projects/%s/jobs/%s' % (self.PROJECT, self.JOB_NAME) conn = _Connection() client = _Client(project=self.PROJECT, connection=conn) table = _Table() @@ -544,11 +544,11 @@ def test_exists_miss_w_bound_client(self): self.assertEqual(len(conn._requested), 1) req = conn._requested[0] self.assertEqual(req['method'], 'GET') - self.assertEqual(req['path'], '/%s' % PATH) + self.assertEqual(req['path'], PATH) self.assertEqual(req['query_params'], {'fields': 'id'}) def test_exists_hit_w_alternate_client(self): - PATH = 'projects/%s/jobs/%s' % (self.PROJECT, self.JOB_NAME) + PATH = '/projects/%s/jobs/%s' % (self.PROJECT, self.JOB_NAME) conn1 = _Connection() client1 = _Client(project=self.PROJECT, connection=conn1) conn2 = _Connection({}) @@ -562,11 +562,11 @@ def test_exists_hit_w_alternate_client(self): self.assertEqual(len(conn2._requested), 1) req = conn2._requested[0] self.assertEqual(req['method'], 'GET') - self.assertEqual(req['path'], '/%s' % PATH) + self.assertEqual(req['path'], PATH) self.assertEqual(req['query_params'], {'fields': 'id'}) def test_reload_w_bound_client(self): - PATH = 'projects/%s/jobs/%s' % (self.PROJECT, self.JOB_NAME) + PATH = '/projects/%s/jobs/%s' % (self.PROJECT, self.JOB_NAME) RESOURCE = self._makeResource() conn = _Connection(RESOURCE) client = _Client(project=self.PROJECT, connection=conn) @@ -578,11 +578,11 @@ def test_reload_w_bound_client(self): self.assertEqual(len(conn._requested), 1) req = conn._requested[0] self.assertEqual(req['method'], 'GET') - self.assertEqual(req['path'], '/%s' % PATH) + self.assertEqual(req['path'], PATH) self._verifyResourceProperties(job, RESOURCE) def test_reload_w_alternate_client(self): - PATH = 'projects/%s/jobs/%s' % (self.PROJECT, self.JOB_NAME) + PATH = '/projects/%s/jobs/%s' % (self.PROJECT, self.JOB_NAME) RESOURCE = self._makeResource() conn1 = _Connection() client1 = _Client(project=self.PROJECT, connection=conn1) @@ -597,11 +597,11 @@ def test_reload_w_alternate_client(self): self.assertEqual(len(conn2._requested), 1) req = conn2._requested[0] self.assertEqual(req['method'], 'GET') - self.assertEqual(req['path'], '/%s' % PATH) + self.assertEqual(req['path'], PATH) self._verifyResourceProperties(job, RESOURCE) def test_cancel_w_bound_client(self): - PATH = 'projects/%s/jobs/%s/cancel' % (self.PROJECT, self.JOB_NAME) + PATH = '/projects/%s/jobs/%s/cancel' % (self.PROJECT, self.JOB_NAME) RESOURCE = self._makeResource(ended=True) RESPONSE = {'job': RESOURCE} conn = _Connection(RESPONSE) @@ -614,11 +614,11 @@ def test_cancel_w_bound_client(self): self.assertEqual(len(conn._requested), 1) req = conn._requested[0] self.assertEqual(req['method'], 'POST') - self.assertEqual(req['path'], '/%s' % PATH) + self.assertEqual(req['path'], PATH) self._verifyResourceProperties(job, RESOURCE) def test_cancel_w_alternate_client(self): - PATH = 'projects/%s/jobs/%s/cancel' % (self.PROJECT, self.JOB_NAME) + PATH = '/projects/%s/jobs/%s/cancel' % (self.PROJECT, self.JOB_NAME) RESOURCE = self._makeResource(ended=True) RESPONSE = {'job': RESOURCE} conn1 = _Connection() @@ -634,7 +634,7 @@ def test_cancel_w_alternate_client(self): self.assertEqual(len(conn2._requested), 1) req = conn2._requested[0] self.assertEqual(req['method'], 'POST') - self.assertEqual(req['path'], '/%s' % PATH) + self.assertEqual(req['path'], PATH) self._verifyResourceProperties(job, RESOURCE) @@ -773,7 +773,7 @@ def test_from_api_repr_w_properties(self): self._verifyResourceProperties(dataset, RESOURCE) def test_begin_w_bound_client(self): - PATH = 'projects/%s/jobs' % self.PROJECT + PATH = '/projects/%s/jobs' % (self.PROJECT,) RESOURCE = self._makeResource() # Ensure None for missing server-set props del RESOURCE['statistics']['creationTime'] @@ -791,7 +791,7 @@ def test_begin_w_bound_client(self): self.assertEqual(len(conn._requested), 1) req = conn._requested[0] self.assertEqual(req['method'], 'POST') - self.assertEqual(req['path'], '/%s' % PATH) + self.assertEqual(req['path'], PATH) SENT = { 'jobReference': { 'projectId': self.PROJECT, @@ -816,7 +816,7 @@ def test_begin_w_bound_client(self): self._verifyResourceProperties(job, RESOURCE) def test_begin_w_alternate_client(self): - PATH = 'projects/%s/jobs' % self.PROJECT + PATH = '/projects/%s/jobs' % (self.PROJECT,) RESOURCE = self._makeResource(ended=True) COPY_CONFIGURATION = { 'sourceTables': [{ @@ -850,7 +850,7 @@ def test_begin_w_alternate_client(self): self.assertEqual(len(conn2._requested), 1) req = conn2._requested[0] self.assertEqual(req['method'], 'POST') - self.assertEqual(req['path'], '/%s' % PATH) + self.assertEqual(req['path'], PATH) SENT = { 'jobReference': { 'projectId': self.PROJECT, @@ -864,7 +864,7 @@ def test_begin_w_alternate_client(self): self._verifyResourceProperties(job, RESOURCE) def test_exists_miss_w_bound_client(self): - PATH = 'projects/%s/jobs/%s' % (self.PROJECT, self.JOB_NAME) + PATH = '/projects/%s/jobs/%s' % (self.PROJECT, self.JOB_NAME) conn = _Connection() client = _Client(project=self.PROJECT, connection=conn) source = _Table(self.SOURCE_TABLE) @@ -876,11 +876,11 @@ def test_exists_miss_w_bound_client(self): self.assertEqual(len(conn._requested), 1) req = conn._requested[0] self.assertEqual(req['method'], 'GET') - self.assertEqual(req['path'], '/%s' % PATH) + self.assertEqual(req['path'], PATH) self.assertEqual(req['query_params'], {'fields': 'id'}) def test_exists_hit_w_alternate_client(self): - PATH = 'projects/%s/jobs/%s' % (self.PROJECT, self.JOB_NAME) + PATH = '/projects/%s/jobs/%s' % (self.PROJECT, self.JOB_NAME) conn1 = _Connection() client1 = _Client(project=self.PROJECT, connection=conn1) conn2 = _Connection({}) @@ -895,11 +895,11 @@ def test_exists_hit_w_alternate_client(self): self.assertEqual(len(conn2._requested), 1) req = conn2._requested[0] self.assertEqual(req['method'], 'GET') - self.assertEqual(req['path'], '/%s' % PATH) + self.assertEqual(req['path'], PATH) self.assertEqual(req['query_params'], {'fields': 'id'}) def test_reload_w_bound_client(self): - PATH = 'projects/%s/jobs/%s' % (self.PROJECT, self.JOB_NAME) + PATH = '/projects/%s/jobs/%s' % (self.PROJECT, self.JOB_NAME) RESOURCE = self._makeResource() conn = _Connection(RESOURCE) client = _Client(project=self.PROJECT, connection=conn) @@ -912,11 +912,11 @@ def test_reload_w_bound_client(self): self.assertEqual(len(conn._requested), 1) req = conn._requested[0] self.assertEqual(req['method'], 'GET') - self.assertEqual(req['path'], '/%s' % PATH) + self.assertEqual(req['path'], PATH) self._verifyResourceProperties(job, RESOURCE) def test_reload_w_alternate_client(self): - PATH = 'projects/%s/jobs/%s' % (self.PROJECT, self.JOB_NAME) + PATH = '/projects/%s/jobs/%s' % (self.PROJECT, self.JOB_NAME) RESOURCE = self._makeResource() conn1 = _Connection() client1 = _Client(project=self.PROJECT, connection=conn1) @@ -932,7 +932,7 @@ def test_reload_w_alternate_client(self): self.assertEqual(len(conn2._requested), 1) req = conn2._requested[0] self.assertEqual(req['method'], 'GET') - self.assertEqual(req['path'], '/%s' % PATH) + self.assertEqual(req['path'], PATH) self._verifyResourceProperties(job, RESOURCE) @@ -1071,7 +1071,7 @@ def test_from_api_repr_w_properties(self): self._verifyResourceProperties(dataset, RESOURCE) def test_begin_w_bound_client(self): - PATH = 'projects/%s/jobs' % self.PROJECT + PATH = '/projects/%s/jobs' % (self.PROJECT,) RESOURCE = self._makeResource() # Ensure None for missing server-set props del RESOURCE['statistics']['creationTime'] @@ -1089,7 +1089,7 @@ def test_begin_w_bound_client(self): self.assertEqual(len(conn._requested), 1) req = conn._requested[0] self.assertEqual(req['method'], 'POST') - self.assertEqual(req['path'], '/%s' % PATH) + self.assertEqual(req['path'], PATH) SENT = { 'jobReference': { 'projectId': self.PROJECT, @@ -1110,7 +1110,7 @@ def test_begin_w_bound_client(self): self._verifyResourceProperties(job, RESOURCE) def test_begin_w_alternate_client(self): - PATH = 'projects/%s/jobs' % self.PROJECT + PATH = '/projects/%s/jobs' % (self.PROJECT,) RESOURCE = self._makeResource(ended=True) EXTRACT_CONFIGURATION = { 'sourceTable': { @@ -1144,7 +1144,7 @@ def test_begin_w_alternate_client(self): self.assertEqual(len(conn2._requested), 1) req = conn2._requested[0] self.assertEqual(req['method'], 'POST') - self.assertEqual(req['path'], '/%s' % PATH) + self.assertEqual(req['path'], PATH) SENT = { 'jobReference': { 'projectId': self.PROJECT, @@ -1158,7 +1158,7 @@ def test_begin_w_alternate_client(self): self._verifyResourceProperties(job, RESOURCE) def test_exists_miss_w_bound_client(self): - PATH = 'projects/%s/jobs/%s' % (self.PROJECT, self.JOB_NAME) + PATH = '/projects/%s/jobs/%s' % (self.PROJECT, self.JOB_NAME) conn = _Connection() client = _Client(project=self.PROJECT, connection=conn) source = _Table(self.SOURCE_TABLE) @@ -1170,11 +1170,11 @@ def test_exists_miss_w_bound_client(self): self.assertEqual(len(conn._requested), 1) req = conn._requested[0] self.assertEqual(req['method'], 'GET') - self.assertEqual(req['path'], '/%s' % PATH) + self.assertEqual(req['path'], PATH) self.assertEqual(req['query_params'], {'fields': 'id'}) def test_exists_hit_w_alternate_client(self): - PATH = 'projects/%s/jobs/%s' % (self.PROJECT, self.JOB_NAME) + PATH = '/projects/%s/jobs/%s' % (self.PROJECT, self.JOB_NAME) conn1 = _Connection() client1 = _Client(project=self.PROJECT, connection=conn1) conn2 = _Connection({}) @@ -1189,11 +1189,11 @@ def test_exists_hit_w_alternate_client(self): self.assertEqual(len(conn2._requested), 1) req = conn2._requested[0] self.assertEqual(req['method'], 'GET') - self.assertEqual(req['path'], '/%s' % PATH) + self.assertEqual(req['path'], PATH) self.assertEqual(req['query_params'], {'fields': 'id'}) def test_reload_w_bound_client(self): - PATH = 'projects/%s/jobs/%s' % (self.PROJECT, self.JOB_NAME) + PATH = '/projects/%s/jobs/%s' % (self.PROJECT, self.JOB_NAME) RESOURCE = self._makeResource() conn = _Connection(RESOURCE) client = _Client(project=self.PROJECT, connection=conn) @@ -1206,11 +1206,11 @@ def test_reload_w_bound_client(self): self.assertEqual(len(conn._requested), 1) req = conn._requested[0] self.assertEqual(req['method'], 'GET') - self.assertEqual(req['path'], '/%s' % PATH) + self.assertEqual(req['path'], PATH) self._verifyResourceProperties(job, RESOURCE) def test_reload_w_alternate_client(self): - PATH = 'projects/%s/jobs/%s' % (self.PROJECT, self.JOB_NAME) + PATH = '/projects/%s/jobs/%s' % (self.PROJECT, self.JOB_NAME) RESOURCE = self._makeResource() conn1 = _Connection() client1 = _Client(project=self.PROJECT, connection=conn1) @@ -1226,7 +1226,7 @@ def test_reload_w_alternate_client(self): self.assertEqual(len(conn2._requested), 1) req = conn2._requested[0] self.assertEqual(req['method'], 'GET') - self.assertEqual(req['path'], '/%s' % PATH) + self.assertEqual(req['path'], PATH) self._verifyResourceProperties(job, RESOURCE) @@ -1287,12 +1287,31 @@ def _verifyIntegerResourceProperties(self, job, config): else: self.assertIsNone(job.maximum_bytes_billed) + def _verify_udf_resources(self, job, config): + udf_resources = config.get('userDefinedFunctionResources', ()) + self.assertEqual(len(job.udf_resources), len(udf_resources)) + for found, expected in zip(job.udf_resources, udf_resources): + if 'resourceUri' in expected: + self.assertEqual(found.udf_type, 'resourceUri') + self.assertEqual(found.value, expected['resourceUri']) + else: + self.assertEqual(found.udf_type, 'inlineCode') + self.assertEqual(found.value, expected['inlineCode']) + + def _verifyQueryParameters(self, job, config): + query_parameters = config.get('queryParameters', ()) + self.assertEqual(len(job.query_parameters), len(query_parameters)) + for found, expected in zip(job.query_parameters, query_parameters): + self.assertEqual(found.to_api_repr(), expected) + def _verifyResourceProperties(self, job, resource): self._verifyReadonlyResourceProperties(job, resource) config = resource.get('configuration', {}).get('query') self._verifyBooleanResourceProperties(job, config) self._verifyIntegerResourceProperties(job, config) + self._verify_udf_resources(job, config) + self._verifyQueryParameters(job, config) self.assertEqual(job.query, config['query']) if 'createDisposition' in config: @@ -1330,7 +1349,7 @@ def _verifyResourceProperties(self, job, resource): else: self.assertIsNone(job.write_disposition) - def test_ctor(self): + def test_ctor_defaults(self): client = _Client(self.PROJECT) job = self._make_one(self.JOB_NAME, self.QUERY, client) self.assertEqual(job.query, self.QUERY) @@ -1356,6 +1375,23 @@ def test_ctor(self): self.assertIsNone(job.maximum_billing_tier) self.assertIsNone(job.maximum_bytes_billed) + def test_ctor_w_udf_resources(self): + from google.cloud.bigquery._helpers import UDFResource + RESOURCE_URI = 'gs://some-bucket/js/lib.js' + udf_resources = [UDFResource("resourceUri", RESOURCE_URI)] + client = _Client(self.PROJECT) + job = self._make_one(self.JOB_NAME, self.QUERY, client, + udf_resources=udf_resources) + self.assertEqual(job.udf_resources, udf_resources) + + def test_ctor_w_query_parameters(self): + from google.cloud.bigquery._helpers import ScalarQueryParameter + query_parameters = [ScalarQueryParameter("foo", 'INT64', 123)] + client = _Client(self.PROJECT) + job = self._make_one(self.JOB_NAME, self.QUERY, client, + query_parameters=query_parameters) + self.assertEqual(job.query_parameters, query_parameters) + def test_from_api_repr_missing_identity(self): self._setUpConstants() client = _Client(self.PROJECT) @@ -1418,7 +1454,7 @@ def test_results(self): self.assertIs(results._job, job) def test_begin_w_bound_client(self): - PATH = 'projects/%s/jobs' % self.PROJECT + PATH = '/projects/%s/jobs' % (self.PROJECT,) RESOURCE = self._makeResource() # Ensure None for missing server-set props del RESOURCE['statistics']['creationTime'] @@ -1434,7 +1470,7 @@ def test_begin_w_bound_client(self): self.assertEqual(len(conn._requested), 1) req = conn._requested[0] self.assertEqual(req['method'], 'POST') - self.assertEqual(req['path'], '/%s' % PATH) + self.assertEqual(req['path'], PATH) SENT = { 'jobReference': { 'projectId': self.PROJECT, @@ -1452,7 +1488,7 @@ def test_begin_w_bound_client(self): def test_begin_w_alternate_client(self): from google.cloud.bigquery.dataset import Dataset from google.cloud.bigquery.dataset import Table - PATH = 'projects/%s/jobs' % self.PROJECT + PATH = '/projects/%s/jobs' % (self.PROJECT,) TABLE = 'TABLE' DS_NAME = 'DATASET' RESOURCE = self._makeResource(ended=True) @@ -1507,7 +1543,7 @@ def test_begin_w_alternate_client(self): self.assertEqual(len(conn2._requested), 1) req = conn2._requested[0] self.assertEqual(req['method'], 'POST') - self.assertEqual(req['path'], '/%s' % PATH) + self.assertEqual(req['path'], PATH) SENT = { 'jobReference': { 'projectId': self.PROJECT, @@ -1520,31 +1556,140 @@ def test_begin_w_alternate_client(self): self.assertEqual(req['data'], SENT) self._verifyResourceProperties(job, RESOURCE) - def test_begin_w_bound_client_and_udf(self): + def test_begin_w_udf(self): from google.cloud.bigquery._helpers import UDFResource RESOURCE_URI = 'gs://some-bucket/js/lib.js' - PATH = 'projects/%s/jobs' % self.PROJECT + INLINE_UDF_CODE = 'var someCode = "here";' + PATH = '/projects/%s/jobs' % (self.PROJECT,) + RESOURCE = self._makeResource() + # Ensure None for missing server-set props + del RESOURCE['statistics']['creationTime'] + del RESOURCE['etag'] + del RESOURCE['selfLink'] + del RESOURCE['user_email'] + RESOURCE['configuration']['query']['userDefinedFunctionResources'] = [ + {'resourceUri': RESOURCE_URI}, + {'inlineCode': INLINE_UDF_CODE}, + ] + conn = _Connection(RESOURCE) + client = _Client(project=self.PROJECT, connection=conn) + udf_resources = [ + UDFResource("resourceUri", RESOURCE_URI), + UDFResource("inlineCode", INLINE_UDF_CODE), + ] + job = self._make_one(self.JOB_NAME, self.QUERY, client, + udf_resources=udf_resources) + + job.begin() + + self.assertEqual(len(conn._requested), 1) + req = conn._requested[0] + self.assertEqual(req['method'], 'POST') + self.assertEqual(req['path'], PATH) + self.assertEqual(job.udf_resources, udf_resources) + SENT = { + 'jobReference': { + 'projectId': self.PROJECT, + 'jobId': self.JOB_NAME, + }, + 'configuration': { + 'query': { + 'query': self.QUERY, + 'userDefinedFunctionResources': [ + {'resourceUri': RESOURCE_URI}, + {'inlineCode': INLINE_UDF_CODE}, + ] + }, + }, + } + self.assertEqual(req['data'], SENT) + self._verifyResourceProperties(job, RESOURCE) + + def test_begin_w_named_query_parameter(self): + from google.cloud.bigquery._helpers import ScalarQueryParameter + query_parameters = [ScalarQueryParameter('foo', 'INT64', 123)] + PATH = '/projects/%s/jobs' % (self.PROJECT,) RESOURCE = self._makeResource() # Ensure None for missing server-set props del RESOURCE['statistics']['creationTime'] del RESOURCE['etag'] del RESOURCE['selfLink'] del RESOURCE['user_email'] + config = RESOURCE['configuration']['query'] + config['parameterMode'] = 'NAMED' + config['queryParameters'] = [ + { + 'name': 'foo', + 'parameterType': { + 'type': 'INT64', + }, + 'parameterValue': { + 'value': 123, + }, + }, + ] + conn = _Connection(RESOURCE) + client = _Client(project=self.PROJECT, connection=conn) + job = self._make_one(self.JOB_NAME, self.QUERY, client, + query_parameters=query_parameters) + + job.begin() + + self.assertEqual(len(conn._requested), 1) + req = conn._requested[0] + self.assertEqual(req['method'], 'POST') + self.assertEqual(req['path'], PATH) + self.assertEqual(job.query_parameters, query_parameters) + SENT = { + 'jobReference': { + 'projectId': self.PROJECT, + 'jobId': self.JOB_NAME, + }, + 'configuration': { + 'query': { + 'query': self.QUERY, + 'parameterMode': 'NAMED', + 'queryParameters': config['queryParameters'], + }, + }, + } + self.assertEqual(req['data'], SENT) + self._verifyResourceProperties(job, RESOURCE) + + def test_begin_w_positional_query_parameter(self): + from google.cloud.bigquery._helpers import ScalarQueryParameter + query_parameters = [ScalarQueryParameter.positional('INT64', 123)] + PATH = '/projects/%s/jobs' % (self.PROJECT,) + RESOURCE = self._makeResource() + # Ensure None for missing server-set props + del RESOURCE['statistics']['creationTime'] + del RESOURCE['etag'] + del RESOURCE['selfLink'] + del RESOURCE['user_email'] + config = RESOURCE['configuration']['query'] + config['parameterMode'] = 'POSITIONAL' + config['queryParameters'] = [ + { + 'parameterType': { + 'type': 'INT64', + }, + 'parameterValue': { + 'value': 123, + }, + }, + ] conn = _Connection(RESOURCE) client = _Client(project=self.PROJECT, connection=conn) job = self._make_one(self.JOB_NAME, self.QUERY, client, - udf_resources=[ - UDFResource("resourceUri", RESOURCE_URI) - ]) + query_parameters=query_parameters) job.begin() self.assertEqual(len(conn._requested), 1) req = conn._requested[0] self.assertEqual(req['method'], 'POST') - self.assertEqual(req['path'], '/%s' % PATH) - self.assertEqual(job.udf_resources, - [UDFResource("resourceUri", RESOURCE_URI)]) + self.assertEqual(req['path'], PATH) + self.assertEqual(job.query_parameters, query_parameters) SENT = { 'jobReference': { 'projectId': self.PROJECT, @@ -1553,8 +1698,8 @@ def test_begin_w_bound_client_and_udf(self): 'configuration': { 'query': { 'query': self.QUERY, - 'userDefinedFunctionResources': - [{'resourceUri': RESOURCE_URI}] + 'parameterMode': 'POSITIONAL', + 'queryParameters': config['queryParameters'], }, }, } @@ -1562,7 +1707,7 @@ def test_begin_w_bound_client_and_udf(self): self._verifyResourceProperties(job, RESOURCE) def test_exists_miss_w_bound_client(self): - PATH = 'projects/%s/jobs/%s' % (self.PROJECT, self.JOB_NAME) + PATH = '/projects/%s/jobs/%s' % (self.PROJECT, self.JOB_NAME) conn = _Connection() client = _Client(project=self.PROJECT, connection=conn) job = self._make_one(self.JOB_NAME, self.QUERY, client) @@ -1572,11 +1717,11 @@ def test_exists_miss_w_bound_client(self): self.assertEqual(len(conn._requested), 1) req = conn._requested[0] self.assertEqual(req['method'], 'GET') - self.assertEqual(req['path'], '/%s' % PATH) + self.assertEqual(req['path'], PATH) self.assertEqual(req['query_params'], {'fields': 'id'}) def test_exists_hit_w_alternate_client(self): - PATH = 'projects/%s/jobs/%s' % (self.PROJECT, self.JOB_NAME) + PATH = '/projects/%s/jobs/%s' % (self.PROJECT, self.JOB_NAME) conn1 = _Connection() client1 = _Client(project=self.PROJECT, connection=conn1) conn2 = _Connection({}) @@ -1589,13 +1734,13 @@ def test_exists_hit_w_alternate_client(self): self.assertEqual(len(conn2._requested), 1) req = conn2._requested[0] self.assertEqual(req['method'], 'GET') - self.assertEqual(req['path'], '/%s' % PATH) + self.assertEqual(req['path'], PATH) self.assertEqual(req['query_params'], {'fields': 'id'}) def test_reload_w_bound_client(self): from google.cloud.bigquery.dataset import Dataset from google.cloud.bigquery.dataset import Table - PATH = 'projects/%s/jobs/%s' % (self.PROJECT, self.JOB_NAME) + PATH = '/projects/%s/jobs/%s' % (self.PROJECT, self.JOB_NAME) DS_NAME = 'DATASET' DEST_TABLE = 'dest_table' RESOURCE = self._makeResource() @@ -1614,11 +1759,11 @@ def test_reload_w_bound_client(self): self.assertEqual(len(conn._requested), 1) req = conn._requested[0] self.assertEqual(req['method'], 'GET') - self.assertEqual(req['path'], '/%s' % PATH) + self.assertEqual(req['path'], PATH) self._verifyResourceProperties(job, RESOURCE) def test_reload_w_alternate_client(self): - PATH = 'projects/%s/jobs/%s' % (self.PROJECT, self.JOB_NAME) + PATH = '/projects/%s/jobs/%s' % (self.PROJECT, self.JOB_NAME) DS_NAME = 'DATASET' DEST_TABLE = 'dest_table' RESOURCE = self._makeResource() @@ -1640,7 +1785,7 @@ def test_reload_w_alternate_client(self): self.assertEqual(len(conn2._requested), 1) req = conn2._requested[0] self.assertEqual(req['method'], 'GET') - self.assertEqual(req['path'], '/%s' % PATH) + self.assertEqual(req['path'], PATH) self._verifyResourceProperties(job, RESOURCE) diff --git a/bigquery/unit_tests/test_query.py b/bigquery/unit_tests/test_query.py index 43aedf334ce0..3dfc795a6072 100644 --- a/bigquery/unit_tests/test_query.py +++ b/bigquery/unit_tests/test_query.py @@ -100,6 +100,23 @@ def _verifyRows(self, query, resource): self.assertEqual(f_row, tuple([cell['v'] for cell in e_row['f']])) + def _verify_udf_resources(self, query, resource): + udf_resources = resource.get('userDefinedFunctionResources', ()) + self.assertEqual(len(query.udf_resources), len(udf_resources)) + for found, expected in zip(query.udf_resources, udf_resources): + if 'resourceUri' in expected: + self.assertEqual(found.udf_type, 'resourceUri') + self.assertEqual(found.value, expected['resourceUri']) + else: + self.assertEqual(found.udf_type, 'inlineCode') + self.assertEqual(found.value, expected['inlineCode']) + + def _verifyQueryParameters(self, query, resource): + query_parameters = resource.get('queryParameters', ()) + self.assertEqual(len(query.query_parameters), len(query_parameters)) + for found, expected in zip(query.query_parameters, query_parameters): + self.assertEqual(found.to_api_repr(), expected) + def _verifyResourceProperties(self, query, resource): self.assertEqual(query.cache_hit, resource.get('cacheHit')) self.assertEqual(query.complete, resource.get('jobComplete')) @@ -114,10 +131,12 @@ def _verifyResourceProperties(self, query, resource): else: self.assertIsNone(query.name) + self._verify_udf_resources(query, resource) + self._verifyQueryParameters(query, resource) self._verifySchema(query, resource) self._verifyRows(query, resource) - def test_ctor(self): + def test_ctor_defaults(self): client = _Client(self.PROJECT) query = self._make_one(self.QUERY, client) self.assertEqual(query.query, self.QUERY) @@ -128,10 +147,12 @@ def test_ctor(self): self.assertIsNone(query.errors) self.assertIsNone(query.name) self.assertIsNone(query.page_token) + self.assertEqual(query.query_parameters, []) self.assertEqual(query.rows, []) self.assertIsNone(query.schema) self.assertIsNone(query.total_rows) self.assertIsNone(query.total_bytes_processed) + self.assertEqual(query.udf_resources, []) self.assertIsNone(query.default_dataset) self.assertIsNone(query.max_results) @@ -139,6 +160,22 @@ def test_ctor(self): self.assertIsNone(query.use_query_cache) self.assertIsNone(query.use_legacy_sql) + def test_ctor_w_udf_resources(self): + from google.cloud.bigquery._helpers import UDFResource + RESOURCE_URI = 'gs://some-bucket/js/lib.js' + udf_resources = [UDFResource("resourceUri", RESOURCE_URI)] + client = _Client(self.PROJECT) + query = self._make_one(self.QUERY, client, udf_resources=udf_resources) + self.assertEqual(query.udf_resources, udf_resources) + + def test_ctor_w_query_parameters(self): + from google.cloud.bigquery._helpers import ScalarQueryParameter + query_parameters = [ScalarQueryParameter("foo", 'INT64', 123)] + client = _Client(self.PROJECT) + query = self._make_one(self.QUERY, client, + query_parameters=query_parameters) + self.assertEqual(query.query_parameters, query_parameters) + def test_from_query_job(self): from google.cloud.bigquery.dataset import Dataset from google.cloud.bigquery.job import QueryJob @@ -293,6 +330,9 @@ def test_run_w_inline_udf(self): INLINE_UDF_CODE = 'var someCode = "here";' PATH = 'projects/%s/queries' % self.PROJECT RESOURCE = self._makeResource(complete=False) + RESOURCE['userDefinedFunctionResources'] = [ + {'inlineCode': INLINE_UDF_CODE}, + ] conn = _Connection(RESOURCE) client = _Client(project=self.PROJECT, connection=conn) query = self._make_one(self.QUERY, client) @@ -315,6 +355,9 @@ def test_run_w_udf_resource_uri(self): RESOURCE_URI = 'gs://some-bucket/js/lib.js' PATH = 'projects/%s/queries' % self.PROJECT RESOURCE = self._makeResource(complete=False) + RESOURCE['userDefinedFunctionResources'] = [ + {'resourceUri': RESOURCE_URI}, + ] conn = _Connection(RESOURCE) client = _Client(project=self.PROJECT, connection=conn) query = self._make_one(self.QUERY, client) @@ -338,6 +381,10 @@ def test_run_w_mixed_udfs(self): INLINE_UDF_CODE = 'var someCode = "here";' PATH = 'projects/%s/queries' % self.PROJECT RESOURCE = self._makeResource(complete=False) + RESOURCE['userDefinedFunctionResources'] = [ + {'resourceUri': RESOURCE_URI}, + {'inlineCode': INLINE_UDF_CODE}, + ] conn = _Connection(RESOURCE) client = _Client(project=self.PROJECT, connection=conn) query = self._make_one(self.QUERY, client) @@ -360,6 +407,75 @@ def test_run_w_mixed_udfs(self): self.assertEqual(req['data'], SENT) self._verifyResourceProperties(query, RESOURCE) + def test_run_w_named_query_paramter(self): + from google.cloud.bigquery._helpers import ScalarQueryParameter + PATH = 'projects/%s/queries' % self.PROJECT + RESOURCE = self._makeResource(complete=False) + RESOURCE['parameterMode'] = 'NAMED' + RESOURCE['queryParameters'] = [ + { + 'name': 'foo', + 'parameterType': { + 'type': 'INT64', + }, + 'parameterValue': { + 'value': 123, + }, + }, + ] + query_parameters = [ScalarQueryParameter('foo', 'INT64', 123)] + conn = _Connection(RESOURCE) + client = _Client(project=self.PROJECT, connection=conn) + query = self._make_one(self.QUERY, client, + query_parameters=query_parameters) + query.run() + + self.assertEqual(len(conn._requested), 1) + req = conn._requested[0] + self.assertEqual(req['method'], 'POST') + self.assertEqual(req['path'], '/%s' % PATH) + SENT = { + 'query': self.QUERY, + 'parameterMode': 'NAMED', + 'queryParameters': RESOURCE['queryParameters'], + } + self.assertEqual(req['data'], SENT) + self._verifyResourceProperties(query, RESOURCE) + + def test_run_w_positional_query_paramter(self): + from google.cloud.bigquery._helpers import ScalarQueryParameter + PATH = 'projects/%s/queries' % self.PROJECT + RESOURCE = self._makeResource(complete=False) + RESOURCE['parameterMode'] = 'POSITIONAL' + RESOURCE['queryParameters'] = [ + { + 'parameterType': { + 'type': 'INT64', + }, + 'parameterValue': { + 'value': 123, + }, + }, + ] + query_parameters = [ScalarQueryParameter.positional('INT64', 123)] + conn = _Connection(RESOURCE) + client = _Client(project=self.PROJECT, connection=conn) + query = self._make_one(self.QUERY, client, + query_parameters=query_parameters) + query.run() + + self.assertEqual(len(conn._requested), 1) + req = conn._requested[0] + self.assertEqual(req['method'], 'POST') + self.assertEqual(req['path'], '/%s' % PATH) + SENT = { + 'query': self.QUERY, + 'parameterMode': 'POSITIONAL', + 'queryParameters': RESOURCE['queryParameters'], + } + self.assertEqual(req['data'], SENT) + self._verifyResourceProperties(query, RESOURCE) + def test_fetch_data_query_not_yet_run(self): conn = _Connection() client = _Client(project=self.PROJECT, connection=conn) diff --git a/docs/bigquery-usage.rst b/docs/bigquery-usage.rst index 937fbf73f315..aaefcaaf0455 100644 --- a/docs/bigquery-usage.rst +++ b/docs/bigquery-usage.rst @@ -209,6 +209,12 @@ Run a query which can be expected to complete within bounded time: :start-after: [START client_run_sync_query] :end-before: [END client_run_sync_query] +Run a query using a named query parameter: + +.. literalinclude:: bigquery_snippets.py + :start-after: [START client_run_sync_query_w_param] + :end-before: [END client_run_sync_query_w_param] + If the rows returned by the query do not fit into the initial response, then we need to fetch the remaining rows via :meth:`~google.cloud.bigquery.query.QueryResults.fetch_data`: diff --git a/docs/bigquery_snippets.py b/docs/bigquery_snippets.py index 4231e64eae91..2bba9acece81 100644 --- a/docs/bigquery_snippets.py +++ b/docs/bigquery_snippets.py @@ -461,13 +461,9 @@ def do_something_with(_): pass # [START client_list_jobs] - jobs, token = client.list_jobs() # API request - while True: - for job in jobs: - do_something_with(job) - if token is None: - break - jobs, token = client.list_jobs(page_token=token) # API request + job_iterator = client.list_jobs() + for job in job_iterator: # API request(s) + do_something_with(job) # [END client_list_jobs] @@ -489,6 +485,30 @@ def client_run_sync_query(client, _): # [END client_run_sync_query] +@snippet +def client_run_sync_query_w_param(client, _): + """Run a synchronous query using a query parameter""" + QUERY_W_PARAM = ( + 'SELECT name FROM `bigquery-public-data.usa_names.usa_1910_2013` ' + 'WHERE state = @state') + LIMIT = 100 + LIMITED = '%s LIMIT %d' % (QUERY_W_PARAM, LIMIT) + TIMEOUT_MS = 1000 + + # [START client_run_sync_query_w_param] + from google.cloud.bigquery import ScalarQueryParameter + param = ScalarQueryParameter('state', 'STRING', 'TX') + query = client.run_sync_query(LIMITED, query_parameters=[param]) + query.use_legacy_sql = False + query.timeout_ms = TIMEOUT_MS + query.run() # API request + + assert query.complete + assert len(query.rows) == LIMIT + assert [field.name for field in query.schema] == ['name'] + # [END client_run_sync_query_w_param] + + @snippet def client_run_sync_query_paged(client, _): """Run a synchronous query with paged results."""