diff --git a/google/cloud/bigquery/client.py b/google/cloud/bigquery/client.py index af8eaf5a7..a53819cde 100644 --- a/google/cloud/bigquery/client.py +++ b/google/cloud/bigquery/client.py @@ -1976,15 +1976,8 @@ def create_job( ) destination = _get_sub_prop(job_config, ["copy", "destinationTable"]) destination = TableReference.from_api_repr(destination) - sources = [] - source_configs = _get_sub_prop(job_config, ["copy", "sourceTables"]) - if source_configs is None: - source_configs = [_get_sub_prop(job_config, ["copy", "sourceTable"])] - for source_config in source_configs: - table_ref = TableReference.from_api_repr(source_config) - sources.append(table_ref) return self.copy_table( - sources, + [], # Source table(s) already in job_config resource. destination, job_config=typing.cast(CopyJobConfig, copy_job_config), retry=retry, diff --git a/google/cloud/bigquery/job/base.py b/google/cloud/bigquery/job/base.py index 86701e295..55e80b2eb 100644 --- a/google/cloud/bigquery/job/base.py +++ b/google/cloud/bigquery/job/base.py @@ -19,7 +19,7 @@ import http import threading import typing -from typing import Dict, Optional, Sequence +from typing import ClassVar, Dict, Optional, Sequence from google.api_core import exceptions import google.api_core.future.polling @@ -150,6 +150,182 @@ def _from_api_repr(cls, resource): return job_ref +class _JobConfig(object): + """Abstract base class for job configuration objects. + + Args: + job_type (str): The key to use for the job configuration. + """ + + def __init__(self, job_type, **kwargs): + self._job_type = job_type + self._properties = {job_type: {}} + for prop, val in kwargs.items(): + setattr(self, prop, val) + + def __setattr__(self, name, value): + """Override to be able to raise error if an unknown property is being set""" + if not name.startswith("_") and not hasattr(type(self), name): + raise AttributeError( + "Property {} is unknown for {}.".format(name, type(self)) + ) + super(_JobConfig, self).__setattr__(name, value) + + @property + def labels(self): + """Dict[str, str]: Labels for the job. + + This method always returns a dict. Once a job has been created on the + server, its labels cannot be modified anymore. + + Raises: + ValueError: If ``value`` type is invalid. + """ + return self._properties.setdefault("labels", {}) + + @labels.setter + def labels(self, value): + if not isinstance(value, dict): + raise ValueError("Pass a dict") + self._properties["labels"] = value + + def _get_sub_prop(self, key, default=None): + """Get a value in the ``self._properties[self._job_type]`` dictionary. + + Most job properties are inside the dictionary related to the job type + (e.g. 'copy', 'extract', 'load', 'query'). Use this method to access + those properties:: + + self._get_sub_prop('destinationTable') + + This is equivalent to using the ``_helpers._get_sub_prop`` function:: + + _helpers._get_sub_prop( + self._properties, ['query', 'destinationTable']) + + Args: + key (str): + Key for the value to get in the + ``self._properties[self._job_type]`` dictionary. + default (Optional[object]): + Default value to return if the key is not found. + Defaults to :data:`None`. + + Returns: + object: The value if present or the default. + """ + return _helpers._get_sub_prop( + self._properties, [self._job_type, key], default=default + ) + + def _set_sub_prop(self, key, value): + """Set a value in the ``self._properties[self._job_type]`` dictionary. + + Most job properties are inside the dictionary related to the job type + (e.g. 'copy', 'extract', 'load', 'query'). Use this method to set + those properties:: + + self._set_sub_prop('useLegacySql', False) + + This is equivalent to using the ``_helper._set_sub_prop`` function:: + + _helper._set_sub_prop( + self._properties, ['query', 'useLegacySql'], False) + + Args: + key (str): + Key to set in the ``self._properties[self._job_type]`` + dictionary. + value (object): Value to set. + """ + _helpers._set_sub_prop(self._properties, [self._job_type, key], value) + + def _del_sub_prop(self, key): + """Remove ``key`` from the ``self._properties[self._job_type]`` dict. + + Most job properties are inside the dictionary related to the job type + (e.g. 'copy', 'extract', 'load', 'query'). Use this method to clear + those properties:: + + self._del_sub_prop('useLegacySql') + + This is equivalent to using the ``_helper._del_sub_prop`` function:: + + _helper._del_sub_prop( + self._properties, ['query', 'useLegacySql']) + + Args: + key (str): + Key to remove in the ``self._properties[self._job_type]`` + dictionary. + """ + _helpers._del_sub_prop(self._properties, [self._job_type, key]) + + def to_api_repr(self) -> dict: + """Build an API representation of the job config. + + Returns: + Dict: A dictionary in the format used by the BigQuery API. + """ + return copy.deepcopy(self._properties) + + def _fill_from_default(self, default_job_config): + """Merge this job config with a default job config. + + The keys in this object take precedence over the keys in the default + config. The merge is done at the top-level as well as for keys one + level below the job type. + + Args: + default_job_config (google.cloud.bigquery.job._JobConfig): + The default job config that will be used to fill in self. + + Returns: + google.cloud.bigquery.job._JobConfig: A new (merged) job config. + """ + if self._job_type != default_job_config._job_type: + raise TypeError( + "attempted to merge two incompatible job types: " + + repr(self._job_type) + + ", " + + repr(default_job_config._job_type) + ) + + # cls is one of the job config subclasses that provides the job_type argument to + # this base class on instantiation, thus missing-parameter warning is a false + # positive here. + new_job_config = self.__class__() # pytype: disable=missing-parameter + + default_job_properties = copy.deepcopy(default_job_config._properties) + for key in self._properties: + if key != self._job_type: + default_job_properties[key] = self._properties[key] + + default_job_properties[self._job_type].update(self._properties[self._job_type]) + new_job_config._properties = default_job_properties + + return new_job_config + + @classmethod + def from_api_repr(cls, resource: dict) -> "_JobConfig": + """Factory: construct a job configuration given its API representation + + Args: + resource (Dict): + A job configuration in the same representation as is returned + from the API. + + Returns: + google.cloud.bigquery.job._JobConfig: Configuration parsed from ``resource``. + """ + # cls is one of the job config subclasses that provides the job_type argument to + # this base class on instantiation, thus missing-parameter warning is a false + # positive here. + job_config = cls() # type: ignore # pytype: disable=missing-parameter + job_config._properties = resource + return job_config + + class _AsyncJob(google.api_core.future.polling.PollingFuture): """Base class for asynchronous jobs. @@ -161,6 +337,9 @@ class _AsyncJob(google.api_core.future.polling.PollingFuture): Client which holds credentials and project configuration. """ + _JOB_TYPE = "unknown" + _CONFIG_CLASS: ClassVar + def __init__(self, job_id, client): super(_AsyncJob, self).__init__() @@ -176,6 +355,13 @@ def __init__(self, job_id, client): self._result_set = False self._completion_lock = threading.Lock() + @property + def configuration(self) -> _JobConfig: + """Job-type specific configurtion.""" + configuration = self._CONFIG_CLASS() + configuration._properties = self._properties.setdefault("configuration", {}) + return configuration + @property def job_id(self): """str: ID of the job.""" @@ -426,8 +612,7 @@ def _set_properties(self, api_response): api_response (Dict): response returned from an API call. """ cleaned = api_response.copy() - - statistics = cleaned.get("statistics", {}) + statistics = cleaned.setdefault("statistics", {}) if "creationTime" in statistics: statistics["creationTime"] = float(statistics["creationTime"]) if "startTime" in statistics: @@ -435,13 +620,7 @@ def _set_properties(self, api_response): if "endTime" in statistics: statistics["endTime"] = float(statistics["endTime"]) - # Save configuration to keep reference same in self._configuration. - cleaned_config = cleaned.pop("configuration", {}) - configuration = self._properties.pop("configuration", {}) - self._properties.clear() - self._properties.update(cleaned) - self._properties["configuration"] = configuration - self._properties["configuration"].update(cleaned_config) + self._properties = cleaned # For Future interface self._set_future_result() @@ -751,182 +930,6 @@ def __repr__(self): return result -class _JobConfig(object): - """Abstract base class for job configuration objects. - - Args: - job_type (str): The key to use for the job configuration. - """ - - def __init__(self, job_type, **kwargs): - self._job_type = job_type - self._properties = {job_type: {}} - for prop, val in kwargs.items(): - setattr(self, prop, val) - - def __setattr__(self, name, value): - """Override to be able to raise error if an unknown property is being set""" - if not name.startswith("_") and not hasattr(type(self), name): - raise AttributeError( - "Property {} is unknown for {}.".format(name, type(self)) - ) - super(_JobConfig, self).__setattr__(name, value) - - @property - def labels(self): - """Dict[str, str]: Labels for the job. - - This method always returns a dict. Once a job has been created on the - server, its labels cannot be modified anymore. - - Raises: - ValueError: If ``value`` type is invalid. - """ - return self._properties.setdefault("labels", {}) - - @labels.setter - def labels(self, value): - if not isinstance(value, dict): - raise ValueError("Pass a dict") - self._properties["labels"] = value - - def _get_sub_prop(self, key, default=None): - """Get a value in the ``self._properties[self._job_type]`` dictionary. - - Most job properties are inside the dictionary related to the job type - (e.g. 'copy', 'extract', 'load', 'query'). Use this method to access - those properties:: - - self._get_sub_prop('destinationTable') - - This is equivalent to using the ``_helpers._get_sub_prop`` function:: - - _helpers._get_sub_prop( - self._properties, ['query', 'destinationTable']) - - Args: - key (str): - Key for the value to get in the - ``self._properties[self._job_type]`` dictionary. - default (Optional[object]): - Default value to return if the key is not found. - Defaults to :data:`None`. - - Returns: - object: The value if present or the default. - """ - return _helpers._get_sub_prop( - self._properties, [self._job_type, key], default=default - ) - - def _set_sub_prop(self, key, value): - """Set a value in the ``self._properties[self._job_type]`` dictionary. - - Most job properties are inside the dictionary related to the job type - (e.g. 'copy', 'extract', 'load', 'query'). Use this method to set - those properties:: - - self._set_sub_prop('useLegacySql', False) - - This is equivalent to using the ``_helper._set_sub_prop`` function:: - - _helper._set_sub_prop( - self._properties, ['query', 'useLegacySql'], False) - - Args: - key (str): - Key to set in the ``self._properties[self._job_type]`` - dictionary. - value (object): Value to set. - """ - _helpers._set_sub_prop(self._properties, [self._job_type, key], value) - - def _del_sub_prop(self, key): - """Remove ``key`` from the ``self._properties[self._job_type]`` dict. - - Most job properties are inside the dictionary related to the job type - (e.g. 'copy', 'extract', 'load', 'query'). Use this method to clear - those properties:: - - self._del_sub_prop('useLegacySql') - - This is equivalent to using the ``_helper._del_sub_prop`` function:: - - _helper._del_sub_prop( - self._properties, ['query', 'useLegacySql']) - - Args: - key (str): - Key to remove in the ``self._properties[self._job_type]`` - dictionary. - """ - _helpers._del_sub_prop(self._properties, [self._job_type, key]) - - def to_api_repr(self) -> dict: - """Build an API representation of the job config. - - Returns: - Dict: A dictionary in the format used by the BigQuery API. - """ - return copy.deepcopy(self._properties) - - def _fill_from_default(self, default_job_config): - """Merge this job config with a default job config. - - The keys in this object take precedence over the keys in the default - config. The merge is done at the top-level as well as for keys one - level below the job type. - - Args: - default_job_config (google.cloud.bigquery.job._JobConfig): - The default job config that will be used to fill in self. - - Returns: - google.cloud.bigquery.job._JobConfig: A new (merged) job config. - """ - if self._job_type != default_job_config._job_type: - raise TypeError( - "attempted to merge two incompatible job types: " - + repr(self._job_type) - + ", " - + repr(default_job_config._job_type) - ) - - # cls is one of the job config subclasses that provides the job_type argument to - # this base class on instantiation, thus missing-parameter warning is a false - # positive here. - new_job_config = self.__class__() # pytype: disable=missing-parameter - - default_job_properties = copy.deepcopy(default_job_config._properties) - for key in self._properties: - if key != self._job_type: - default_job_properties[key] = self._properties[key] - - default_job_properties[self._job_type].update(self._properties[self._job_type]) - new_job_config._properties = default_job_properties - - return new_job_config - - @classmethod - def from_api_repr(cls, resource: dict) -> "_JobConfig": - """Factory: construct a job configuration given its API representation - - Args: - resource (Dict): - A job configuration in the same representation as is returned - from the API. - - Returns: - google.cloud.bigquery.job._JobConfig: Configuration parsed from ``resource``. - """ - # cls is one of the job config subclasses that provides the job_type argument to - # this base class on instantiation, thus missing-parameter warning is a false - # positive here. - job_config = cls() # type: ignore # pytype: disable=missing-parameter - job_config._properties = resource - return job_config - - class ScriptStackFrame(object): """Stack frame showing the line/column/procedure name where the current evaluation happened. diff --git a/google/cloud/bigquery/job/copy_.py b/google/cloud/bigquery/job/copy_.py index 9d7548ec5..5c52aeed6 100644 --- a/google/cloud/bigquery/job/copy_.py +++ b/google/cloud/bigquery/job/copy_.py @@ -14,6 +14,7 @@ """Classes for copy jobs.""" +import typing from typing import Optional from google.cloud.bigquery.encryption_configuration import EncryptionConfiguration @@ -160,15 +161,13 @@ class CopyJob(_AsyncJob): """ _JOB_TYPE = "copy" + _CONFIG_CLASS = CopyJobConfig def __init__(self, job_id, sources, destination, client, job_config=None): super(CopyJob, self).__init__(job_id, client) - if not job_config: - job_config = CopyJobConfig() - - self._configuration = job_config - self._properties["configuration"] = job_config._properties + if job_config is not None: + self._properties["configuration"] = job_config._properties if destination: _helpers._set_sub_prop( @@ -185,6 +184,11 @@ def __init__(self, job_id, sources, destination, client, job_config=None): source_resources, ) + @property + def configuration(self) -> CopyJobConfig: + """The configuration for this copy job.""" + return typing.cast(CopyJobConfig, super().configuration) + @property def destination(self): """google.cloud.bigquery.table.TableReference: Table into which data @@ -223,14 +227,14 @@ def create_disposition(self): """See :attr:`google.cloud.bigquery.job.CopyJobConfig.create_disposition`. """ - return self._configuration.create_disposition + return self.configuration.create_disposition @property def write_disposition(self): """See :attr:`google.cloud.bigquery.job.CopyJobConfig.write_disposition`. """ - return self._configuration.write_disposition + return self.configuration.write_disposition @property def destination_encryption_configuration(self): @@ -243,7 +247,7 @@ def destination_encryption_configuration(self): See :attr:`google.cloud.bigquery.job.CopyJobConfig.destination_encryption_configuration`. """ - return self._configuration.destination_encryption_configuration + return self.configuration.destination_encryption_configuration def to_api_repr(self): """Generate a resource for :meth:`_begin`.""" diff --git a/google/cloud/bigquery/job/extract.py b/google/cloud/bigquery/job/extract.py index 52aa036c9..64ec39b76 100644 --- a/google/cloud/bigquery/job/extract.py +++ b/google/cloud/bigquery/job/extract.py @@ -14,6 +14,8 @@ """Classes for extract (export) jobs.""" +import typing + from google.cloud.bigquery import _helpers from google.cloud.bigquery.model import ModelReference from google.cloud.bigquery.table import Table @@ -125,15 +127,13 @@ class ExtractJob(_AsyncJob): """ _JOB_TYPE = "extract" + _CONFIG_CLASS = ExtractJobConfig def __init__(self, job_id, source, destination_uris, client, job_config=None): super(ExtractJob, self).__init__(job_id, client) - if job_config is None: - job_config = ExtractJobConfig() - - self._properties["configuration"] = job_config._properties - self._configuration = job_config + if job_config is not None: + self._properties["configuration"] = job_config._properties if source: source_ref = {"projectId": source.project, "datasetId": source.dataset_id} @@ -156,6 +156,11 @@ def __init__(self, job_id, source, destination_uris, client, job_config=None): destination_uris, ) + @property + def configuration(self) -> ExtractJobConfig: + """The configuration for this extract job.""" + return typing.cast(ExtractJobConfig, super().configuration) + @property def source(self): """Union[ \ @@ -189,28 +194,28 @@ def compression(self): """See :attr:`google.cloud.bigquery.job.ExtractJobConfig.compression`. """ - return self._configuration.compression + return self.configuration.compression @property def destination_format(self): """See :attr:`google.cloud.bigquery.job.ExtractJobConfig.destination_format`. """ - return self._configuration.destination_format + return self.configuration.destination_format @property def field_delimiter(self): """See :attr:`google.cloud.bigquery.job.ExtractJobConfig.field_delimiter`. """ - return self._configuration.field_delimiter + return self.configuration.field_delimiter @property def print_header(self): """See :attr:`google.cloud.bigquery.job.ExtractJobConfig.print_header`. """ - return self._configuration.print_header + return self.configuration.print_header @property def destination_uri_file_counts(self): diff --git a/google/cloud/bigquery/job/load.py b/google/cloud/bigquery/job/load.py index 7481cb378..6b6c8bfd9 100644 --- a/google/cloud/bigquery/job/load.py +++ b/google/cloud/bigquery/job/load.py @@ -14,6 +14,7 @@ """Classes for load jobs.""" +import typing from typing import FrozenSet, List, Iterable, Optional from google.cloud.bigquery.encryption_configuration import EncryptionConfiguration @@ -605,15 +606,13 @@ class LoadJob(_AsyncJob): """ _JOB_TYPE = "load" + _CONFIG_CLASS = LoadJobConfig def __init__(self, job_id, source_uris, destination, client, job_config=None): super(LoadJob, self).__init__(job_id, client) - if not job_config: - job_config = LoadJobConfig() - - self._configuration = job_config - self._properties["configuration"] = job_config._properties + if job_config is not None: + self._properties["configuration"] = job_config._properties if source_uris is not None: _helpers._set_sub_prop( @@ -627,6 +626,11 @@ def __init__(self, job_id, source_uris, destination, client, job_config=None): destination.to_api_repr(), ) + @property + def configuration(self) -> LoadJobConfig: + """The configuration for this load job.""" + return typing.cast(LoadJobConfig, super().configuration) + @property def destination(self): """google.cloud.bigquery.table.TableReference: table where loaded rows are written @@ -654,21 +658,21 @@ def allow_jagged_rows(self): """See :attr:`google.cloud.bigquery.job.LoadJobConfig.allow_jagged_rows`. """ - return self._configuration.allow_jagged_rows + return self.configuration.allow_jagged_rows @property def allow_quoted_newlines(self): """See :attr:`google.cloud.bigquery.job.LoadJobConfig.allow_quoted_newlines`. """ - return self._configuration.allow_quoted_newlines + return self.configuration.allow_quoted_newlines @property def autodetect(self): """See :attr:`google.cloud.bigquery.job.LoadJobConfig.autodetect`. """ - return self._configuration.autodetect + return self.configuration.autodetect @property def connection_properties(self) -> List[ConnectionProperty]: @@ -677,14 +681,14 @@ def connection_properties(self) -> List[ConnectionProperty]: .. versionadded:: 3.7.0 """ - return self._configuration.connection_properties + return self.configuration.connection_properties @property def create_disposition(self): """See :attr:`google.cloud.bigquery.job.LoadJobConfig.create_disposition`. """ - return self._configuration.create_disposition + return self.configuration.create_disposition @property def create_session(self) -> Optional[bool]: @@ -693,84 +697,84 @@ def create_session(self) -> Optional[bool]: .. versionadded:: 3.7.0 """ - return self._configuration.create_session + return self.configuration.create_session @property def encoding(self): """See :attr:`google.cloud.bigquery.job.LoadJobConfig.encoding`. """ - return self._configuration.encoding + return self.configuration.encoding @property def field_delimiter(self): """See :attr:`google.cloud.bigquery.job.LoadJobConfig.field_delimiter`. """ - return self._configuration.field_delimiter + return self.configuration.field_delimiter @property def ignore_unknown_values(self): """See :attr:`google.cloud.bigquery.job.LoadJobConfig.ignore_unknown_values`. """ - return self._configuration.ignore_unknown_values + return self.configuration.ignore_unknown_values @property def max_bad_records(self): """See :attr:`google.cloud.bigquery.job.LoadJobConfig.max_bad_records`. """ - return self._configuration.max_bad_records + return self.configuration.max_bad_records @property def null_marker(self): """See :attr:`google.cloud.bigquery.job.LoadJobConfig.null_marker`. """ - return self._configuration.null_marker + return self.configuration.null_marker @property def quote_character(self): """See :attr:`google.cloud.bigquery.job.LoadJobConfig.quote_character`. """ - return self._configuration.quote_character + return self.configuration.quote_character @property def reference_file_schema_uri(self): """See: attr:`google.cloud.bigquery.job.LoadJobConfig.reference_file_schema_uri`. """ - return self._configuration.reference_file_schema_uri + return self.configuration.reference_file_schema_uri @property def skip_leading_rows(self): """See :attr:`google.cloud.bigquery.job.LoadJobConfig.skip_leading_rows`. """ - return self._configuration.skip_leading_rows + return self.configuration.skip_leading_rows @property def source_format(self): """See :attr:`google.cloud.bigquery.job.LoadJobConfig.source_format`. """ - return self._configuration.source_format + return self.configuration.source_format @property def write_disposition(self): """See :attr:`google.cloud.bigquery.job.LoadJobConfig.write_disposition`. """ - return self._configuration.write_disposition + return self.configuration.write_disposition @property def schema(self): """See :attr:`google.cloud.bigquery.job.LoadJobConfig.schema`. """ - return self._configuration.schema + return self.configuration.schema @property def destination_encryption_configuration(self): @@ -783,7 +787,7 @@ def destination_encryption_configuration(self): See :attr:`google.cloud.bigquery.job.LoadJobConfig.destination_encryption_configuration`. """ - return self._configuration.destination_encryption_configuration + return self.configuration.destination_encryption_configuration @property def destination_table_description(self): @@ -792,7 +796,7 @@ def destination_table_description(self): See: https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#DestinationTableProperties.FIELDS.description """ - return self._configuration.destination_table_description + return self.configuration.destination_table_description @property def destination_table_friendly_name(self): @@ -801,42 +805,42 @@ def destination_table_friendly_name(self): See: https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#DestinationTableProperties.FIELDS.friendly_name """ - return self._configuration.destination_table_friendly_name + return self.configuration.destination_table_friendly_name @property def range_partitioning(self): """See :attr:`google.cloud.bigquery.job.LoadJobConfig.range_partitioning`. """ - return self._configuration.range_partitioning + return self.configuration.range_partitioning @property def time_partitioning(self): """See :attr:`google.cloud.bigquery.job.LoadJobConfig.time_partitioning`. """ - return self._configuration.time_partitioning + return self.configuration.time_partitioning @property def use_avro_logical_types(self): """See :attr:`google.cloud.bigquery.job.LoadJobConfig.use_avro_logical_types`. """ - return self._configuration.use_avro_logical_types + return self.configuration.use_avro_logical_types @property def clustering_fields(self): """See :attr:`google.cloud.bigquery.job.LoadJobConfig.clustering_fields`. """ - return self._configuration.clustering_fields + return self.configuration.clustering_fields @property def schema_update_options(self): """See :attr:`google.cloud.bigquery.job.LoadJobConfig.schema_update_options`. """ - return self._configuration.schema_update_options + return self.configuration.schema_update_options @property def input_file_bytes(self): diff --git a/google/cloud/bigquery/job/query.py b/google/cloud/bigquery/job/query.py index c63fa0892..e6d6d682d 100644 --- a/google/cloud/bigquery/job/query.py +++ b/google/cloud/bigquery/job/query.py @@ -745,17 +745,15 @@ class QueryJob(_AsyncJob): _JOB_TYPE = "query" _UDF_KEY = "userDefinedFunctionResources" + _CONFIG_CLASS = QueryJobConfig def __init__(self, job_id, query, client, job_config=None): super(QueryJob, self).__init__(job_id, client) - if job_config is None: - job_config = QueryJobConfig() - if job_config.use_legacy_sql is None: - job_config.use_legacy_sql = False - - self._properties["configuration"] = job_config._properties - self._configuration = job_config + if job_config is not None: + self._properties["configuration"] = job_config._properties + if self.configuration.use_legacy_sql is None: + self.configuration.use_legacy_sql = False if query: _helpers._set_sub_prop( @@ -771,7 +769,12 @@ def allow_large_results(self): """See :attr:`google.cloud.bigquery.job.QueryJobConfig.allow_large_results`. """ - return self._configuration.allow_large_results + return self.configuration.allow_large_results + + @property + def configuration(self) -> QueryJobConfig: + """The configuration for this query job.""" + return typing.cast(QueryJobConfig, super().configuration) @property def connection_properties(self) -> List[ConnectionProperty]: @@ -780,14 +783,14 @@ def connection_properties(self) -> List[ConnectionProperty]: .. versionadded:: 2.29.0 """ - return self._configuration.connection_properties + return self.configuration.connection_properties @property def create_disposition(self): """See :attr:`google.cloud.bigquery.job.QueryJobConfig.create_disposition`. """ - return self._configuration.create_disposition + return self.configuration.create_disposition @property def create_session(self) -> Optional[bool]: @@ -796,21 +799,21 @@ def create_session(self) -> Optional[bool]: .. versionadded:: 2.29.0 """ - return self._configuration.create_session + return self.configuration.create_session @property def default_dataset(self): """See :attr:`google.cloud.bigquery.job.QueryJobConfig.default_dataset`. """ - return self._configuration.default_dataset + return self.configuration.default_dataset @property def destination(self): """See :attr:`google.cloud.bigquery.job.QueryJobConfig.destination`. """ - return self._configuration.destination + return self.configuration.destination @property def destination_encryption_configuration(self): @@ -823,28 +826,28 @@ def destination_encryption_configuration(self): See :attr:`google.cloud.bigquery.job.QueryJobConfig.destination_encryption_configuration`. """ - return self._configuration.destination_encryption_configuration + return self.configuration.destination_encryption_configuration @property def dry_run(self): """See :attr:`google.cloud.bigquery.job.QueryJobConfig.dry_run`. """ - return self._configuration.dry_run + return self.configuration.dry_run @property def flatten_results(self): """See :attr:`google.cloud.bigquery.job.QueryJobConfig.flatten_results`. """ - return self._configuration.flatten_results + return self.configuration.flatten_results @property def priority(self): """See :attr:`google.cloud.bigquery.job.QueryJobConfig.priority`. """ - return self._configuration.priority + return self.configuration.priority @property def query(self): @@ -862,90 +865,90 @@ def query_parameters(self): """See :attr:`google.cloud.bigquery.job.QueryJobConfig.query_parameters`. """ - return self._configuration.query_parameters + return self.configuration.query_parameters @property def udf_resources(self): """See :attr:`google.cloud.bigquery.job.QueryJobConfig.udf_resources`. """ - return self._configuration.udf_resources + return self.configuration.udf_resources @property def use_legacy_sql(self): """See :attr:`google.cloud.bigquery.job.QueryJobConfig.use_legacy_sql`. """ - return self._configuration.use_legacy_sql + return self.configuration.use_legacy_sql @property def use_query_cache(self): """See :attr:`google.cloud.bigquery.job.QueryJobConfig.use_query_cache`. """ - return self._configuration.use_query_cache + return self.configuration.use_query_cache @property def write_disposition(self): """See :attr:`google.cloud.bigquery.job.QueryJobConfig.write_disposition`. """ - return self._configuration.write_disposition + return self.configuration.write_disposition @property def maximum_billing_tier(self): """See :attr:`google.cloud.bigquery.job.QueryJobConfig.maximum_billing_tier`. """ - return self._configuration.maximum_billing_tier + return self.configuration.maximum_billing_tier @property def maximum_bytes_billed(self): """See :attr:`google.cloud.bigquery.job.QueryJobConfig.maximum_bytes_billed`. """ - return self._configuration.maximum_bytes_billed + return self.configuration.maximum_bytes_billed @property def range_partitioning(self): """See :attr:`google.cloud.bigquery.job.QueryJobConfig.range_partitioning`. """ - return self._configuration.range_partitioning + return self.configuration.range_partitioning @property def table_definitions(self): """See :attr:`google.cloud.bigquery.job.QueryJobConfig.table_definitions`. """ - return self._configuration.table_definitions + return self.configuration.table_definitions @property def time_partitioning(self): """See :attr:`google.cloud.bigquery.job.QueryJobConfig.time_partitioning`. """ - return self._configuration.time_partitioning + return self.configuration.time_partitioning @property def clustering_fields(self): """See :attr:`google.cloud.bigquery.job.QueryJobConfig.clustering_fields`. """ - return self._configuration.clustering_fields + return self.configuration.clustering_fields @property def schema_update_options(self): """See :attr:`google.cloud.bigquery.job.QueryJobConfig.schema_update_options`. """ - return self._configuration.schema_update_options + return self.configuration.schema_update_options def to_api_repr(self): """Generate a resource for :meth:`_begin`.""" # Use to_api_repr to allow for some configuration properties to be set # automatically. - configuration = self._configuration.to_api_repr() + configuration = self.configuration.to_api_repr() return { "jobReference": self._properties["jobReference"], "configuration": configuration, @@ -1257,7 +1260,7 @@ def _format_for_exception(message: str, query: str): """ template = "{message}\n\n{header}\n\n{ruler}\n{body}\n{ruler}" - lines = query.splitlines() + lines = query.splitlines() if query is not None else [""] max_line_len = max(len(line) for line in lines) header = "-----Query Job SQL Follows-----" diff --git a/tests/system/test_client.py b/tests/system/test_client.py index 14a9b04d4..a69bb92c5 100644 --- a/tests/system/test_client.py +++ b/tests/system/test_client.py @@ -2455,7 +2455,7 @@ def test_table_clones(dataset_id): # Now create a clone before modifying the original table data. copy_config = CopyJobConfig() copy_config.operation_type = OperationType.CLONE - copy_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE + copy_config.write_disposition = bigquery.WriteDisposition.WRITE_EMPTY copy_job = client.copy_table( sources=table_path_source, diff --git a/tests/unit/job/test_base.py b/tests/unit/job/test_base.py index ed0dc731b..3ff96e874 100644 --- a/tests/unit/job/test_base.py +++ b/tests/unit/job/test_base.py @@ -432,11 +432,19 @@ def _set_properties_job(self): def test__set_properties_no_stats(self): config = {"test": True} resource = {"configuration": config} + expected = resource.copy() + expected["statistics"] = {} job = self._set_properties_job() + original_resource = job._properties job._set_properties(resource) - self.assertEqual(job._properties, resource) + self.assertEqual(job._properties, expected) + + # Make sure we don't mutate the object used in the request, as that + # makes debugging more difficult and leads to false positives in unit + # tests. + self.assertIsNot(job._properties, original_resource) def test__set_properties_w_creation_time(self): now, millis = self._datetime_and_millis() @@ -546,6 +554,8 @@ def test__begin_defaults(self): }, "configuration": {"test": True}, } + expected = resource.copy() + expected["statistics"] = {} job = self._set_properties_job() builder = job.to_api_repr = mock.Mock() builder.return_value = resource @@ -564,7 +574,7 @@ def test__begin_defaults(self): data=resource, timeout=None, ) - self.assertEqual(job._properties, resource) + self.assertEqual(job._properties, expected) def test__begin_explicit(self): from google.cloud.bigquery.retry import DEFAULT_RETRY @@ -578,6 +588,8 @@ def test__begin_explicit(self): }, "configuration": {"test": True}, } + expected = resource.copy() + expected["statistics"] = {} job = self._set_properties_job() builder = job.to_api_repr = mock.Mock() builder.return_value = resource @@ -598,7 +610,7 @@ def test__begin_explicit(self): data=resource, timeout=7.5, ) - self.assertEqual(job._properties, resource) + self.assertEqual(job._properties, expected) def test_exists_defaults_miss(self): from google.cloud.exceptions import NotFound @@ -685,6 +697,8 @@ def test_reload_defaults(self): }, "configuration": {"test": True}, } + expected = resource.copy() + expected["statistics"] = {} job = self._set_properties_job() job._properties["jobReference"]["location"] = self.LOCATION call_api = job._client._call_api = mock.Mock() @@ -703,7 +717,7 @@ def test_reload_defaults(self): query_params={"location": self.LOCATION}, timeout=None, ) - self.assertEqual(job._properties, resource) + self.assertEqual(job._properties, expected) def test_reload_explicit(self): from google.cloud.bigquery.retry import DEFAULT_RETRY @@ -717,6 +731,8 @@ def test_reload_explicit(self): }, "configuration": {"test": True}, } + expected = resource.copy() + expected["statistics"] = {} job = self._set_properties_job() client = _make_client(project=other_project) call_api = client._call_api = mock.Mock() @@ -736,7 +752,7 @@ def test_reload_explicit(self): query_params={}, timeout=4.2, ) - self.assertEqual(job._properties, resource) + self.assertEqual(job._properties, expected) def test_cancel_defaults(self): resource = { @@ -747,6 +763,8 @@ def test_cancel_defaults(self): }, "configuration": {"test": True}, } + expected = resource.copy() + expected["statistics"] = {} response = {"job": resource} job = self._set_properties_job() job._properties["jobReference"]["location"] = self.LOCATION @@ -764,7 +782,7 @@ def test_cancel_defaults(self): query_params={"location": self.LOCATION}, timeout=None, ) - self.assertEqual(job._properties, resource) + self.assertEqual(job._properties, expected) def test_cancel_explicit(self): other_project = "other-project-234" @@ -776,6 +794,8 @@ def test_cancel_explicit(self): }, "configuration": {"test": True}, } + expected = resource.copy() + expected["statistics"] = {} response = {"job": resource} job = self._set_properties_job() client = _make_client(project=other_project) @@ -797,7 +817,7 @@ def test_cancel_explicit(self): query_params={}, timeout=7.5, ) - self.assertEqual(job._properties, resource) + self.assertEqual(job._properties, expected) def test_cancel_w_custom_retry(self): from google.cloud.bigquery.retry import DEFAULT_RETRY @@ -811,6 +831,8 @@ def test_cancel_w_custom_retry(self): }, "configuration": {"test": True}, } + expected = resource.copy() + expected["statistics"] = {} response = {"job": resource} job = self._set_properties_job() @@ -830,7 +852,7 @@ def test_cancel_w_custom_retry(self): final_attributes.assert_called() self.assertTrue(result) - self.assertEqual(job._properties, resource) + self.assertEqual(job._properties, expected) self.assertEqual( fake_api_request.call_args_list, [ diff --git a/tests/unit/job/test_load.py b/tests/unit/job/test_load.py index cf3ce1661..c6bbaa2fb 100644 --- a/tests/unit/job/test_load.py +++ b/tests/unit/job/test_load.py @@ -451,6 +451,7 @@ def test_begin_w_bound_client(self): conn = make_connection(RESOURCE) client = _make_client(project=self.PROJECT, connection=conn) job = self._make_one(self.JOB_ID, [self.SOURCE1], self.TABLE_REF, client) + job.configuration.reference_file_schema_uri = self.REFERENCE_FILE_SCHEMA_URI path = "/projects/{}/jobs".format(self.PROJECT) with mock.patch( "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" @@ -498,6 +499,7 @@ def test_begin_w_autodetect(self): job = self._make_one( self.JOB_ID, [self.SOURCE1], self.TABLE_REF, client, config ) + job.configuration.reference_file_schema_uri = self.REFERENCE_FILE_SCHEMA_URI with mock.patch( "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" ) as final_attributes: @@ -554,19 +556,18 @@ def test_begin_w_alternate_client(self): "sourceFormat": "CSV", "useAvroLogicalTypes": True, "writeDisposition": WriteDisposition.WRITE_TRUNCATE, + "referenceFileSchemaUri": "gs://path/to/reference", "schema": { "fields": [ { "name": "full_name", "type": "STRING", "mode": "REQUIRED", - "description": None, }, { "name": "age", "type": "INTEGER", "mode": "REQUIRED", - "description": None, }, ] }, diff --git a/tests/unit/test_client.py b/tests/unit/test_client.py index f38874843..f52eb825a 100644 --- a/tests/unit/test_client.py +++ b/tests/unit/test_client.py @@ -2743,17 +2743,21 @@ def _create_job_helper(self, job_config): http = object() client = self._make_one(project=self.PROJECT, credentials=creds, _http=http) - RESOURCE = { + resource = { + "jobReference": {"projectId": self.PROJECT, "jobId": "random-id"}, + "configuration": job_config, + } + expected = { "jobReference": {"projectId": self.PROJECT, "jobId": mock.ANY}, "configuration": job_config, } - conn = client._connection = make_connection(RESOURCE) + conn = client._connection = make_connection(resource) client.create_job(job_config=job_config) conn.api_request.assert_called_once_with( method="POST", path="/projects/%s/jobs" % self.PROJECT, - data=RESOURCE, + data=expected, timeout=DEFAULT_TIMEOUT, ) @@ -3156,7 +3160,7 @@ def test_load_table_from_uri(self): self.assertEqual(job_config.to_api_repr(), original_config_copy.to_api_repr()) self.assertIsInstance(job, LoadJob) - self.assertIsInstance(job._configuration, LoadJobConfig) + self.assertIsInstance(job.configuration, LoadJobConfig) self.assertIs(job._client, client) self.assertEqual(job.job_id, JOB) self.assertEqual(list(job.source_uris), [SOURCE_URI]) @@ -3662,7 +3666,7 @@ def test_copy_table_w_source_strings(self): creds = _make_credentials() http = object() client = self._make_one(project=self.PROJECT, credentials=creds, _http=http) - client._connection = make_connection({}) + conn = client._connection = make_connection({}) sources = [ "dataset_wo_proj.some_table", "other_project.other_dataset.other_table", @@ -3674,6 +3678,11 @@ def test_copy_table_w_source_strings(self): job = client.copy_table(sources, destination) + # Replace job with the request instead of response so we can verify those properties. + _, kwargs = conn.api_request.call_args + request = kwargs["data"] + job._properties = request + expected_sources = [ DatasetReference(client.project, "dataset_wo_proj").table("some_table"), DatasetReference("other_project", "other_dataset").table("other_table"), @@ -3750,7 +3759,7 @@ def test_copy_table_w_valid_job_config(self): data=RESOURCE, timeout=DEFAULT_TIMEOUT, ) - self.assertIsInstance(job._configuration, CopyJobConfig) + self.assertIsInstance(job.configuration, CopyJobConfig) # the original config object should not have been modified assert job_config.to_api_repr() == original_config_copy.to_api_repr()