From f305f04394c2dae5001e1e537038859d6253c1ad Mon Sep 17 00:00:00 2001 From: HemangChothani Date: Tue, 6 Oct 2020 18:55:20 +0530 Subject: [PATCH 1/6] fix: broken create_job method --- google/cloud/bigquery/client.py | 12 +++++++----- tests/unit/test_client.py | 20 -------------------- 2 files changed, 7 insertions(+), 25 deletions(-) diff --git a/google/cloud/bigquery/client.py b/google/cloud/bigquery/client.py index fcb18385d..fa08d1c2b 100644 --- a/google/cloud/bigquery/client.py +++ b/google/cloud/bigquery/client.py @@ -1617,6 +1617,7 @@ def create_job(self, job_config, retry=DEFAULT_RETRY): ) destination = _get_sub_prop(job_config, ["load", "destinationTable"]) source_uris = _get_sub_prop(job_config, ["load", "sourceUris"]) + destination = TableReference.from_api_repr(destination) return self.load_table_from_uri( source_uris, destination, job_config=load_job_config, retry=retry ) @@ -1625,11 +1626,9 @@ def create_job(self, job_config, retry=DEFAULT_RETRY): job_config ) destination = _get_sub_prop(job_config, ["copy", "destinationTable"]) + destination = TableReference.from_api_repr(destination) sources = [] source_configs = _get_sub_prop(job_config, ["copy", "sourceTables"]) - - if source_configs is None: - source_configs = [_get_sub_prop(job_config, ["copy", "sourceTable"])] for source_config in source_configs: table_ref = TableReference.from_api_repr(source_config) sources.append(table_ref) @@ -1641,10 +1640,13 @@ def create_job(self, job_config, retry=DEFAULT_RETRY): job_config ) source = _get_sub_prop(job_config, ["extract", "sourceTable"]) - source_type = "Table" - if not source: + if source: + source_type = "Table" + source = TableReference.from_api_repr(source) + else: source = _get_sub_prop(job_config, ["extract", "sourceModel"]) source_type = "Model" + source = ModelReference.from_api_repr(source) destination_uris = _get_sub_prop(job_config, ["extract", "destinationUris"]) return self.extract_table( source, diff --git a/tests/unit/test_client.py b/tests/unit/test_client.py index f44201ab8..11d7586a2 100644 --- a/tests/unit/test_client.py +++ b/tests/unit/test_client.py @@ -3631,26 +3631,6 @@ def test_create_job_copy_config(self): configuration, "google.cloud.bigquery.client.Client.copy_table", ) - def test_create_job_copy_config_w_single_source(self): - configuration = { - "copy": { - "sourceTable": { - "projectId": self.PROJECT, - "datasetId": self.DS_ID, - "tableId": "source_table", - }, - "destinationTable": { - "projectId": self.PROJECT, - "datasetId": self.DS_ID, - "tableId": "destination_table", - }, - } - } - - self._create_job_helper( - configuration, "google.cloud.bigquery.client.Client.copy_table", - ) - def test_create_job_extract_config(self): configuration = { "extract": { From 38ce75eb389d90c938ee145307f65af1425f00cd Mon Sep 17 00:00:00 2001 From: HemangChothani Date: Wed, 7 Oct 2020 13:44:44 +0530 Subject: [PATCH 2/6] fix: changes in unit tests --- tests/unit/test_client.py | 54 ++++++++++++++++++++------------------- 1 file changed, 28 insertions(+), 26 deletions(-) diff --git a/tests/unit/test_client.py b/tests/unit/test_client.py index 11d7586a2..f0a6810d9 100644 --- a/tests/unit/test_client.py +++ b/tests/unit/test_client.py @@ -3577,21 +3577,29 @@ def test_delete_table_w_not_found_ok_true(self): conn.api_request.assert_called_with(method="DELETE", path=path, timeout=None) - def _create_job_helper(self, job_config, client_method): + def _create_job_helper(self, job_config): + from google.cloud.bigquery._helpers import _del_sub_prop + creds = _make_credentials() http = object() client = self._make_one(project=self.PROJECT, credentials=creds, _http=http) - client._connection = make_connection() - rf1 = mock.Mock() - get_config_patch = mock.patch( - "google.cloud.bigquery.job._JobConfig.from_api_repr", return_value=rf1, - ) - load_patch = mock.patch(client_method, autospec=True) + RESOURCE = { + "jobReference": {"projectId": self.PROJECT, "jobId": mock.ANY}, + "configuration": job_config, + } + conn = client._connection = make_connection(RESOURCE) + client.create_job(job_config=job_config) - with load_patch as client_method, get_config_patch: - client.create_job(job_config=job_config) - client_method.assert_called_once() + if "query" in job_config: + _del_sub_prop(job_config, ["query", "destinationTable"]) + + conn.api_request.assert_called_once_with( + method="POST", + path="/projects/%s/jobs" % self.PROJECT, + data=RESOURCE, + timeout=None, + ) def test_create_job_load_config(self): configuration = { @@ -3605,9 +3613,7 @@ def test_create_job_load_config(self): } } - self._create_job_helper( - configuration, "google.cloud.bigquery.client.Client.load_table_from_uri" - ) + self._create_job_helper(configuration) def test_create_job_copy_config(self): configuration = { @@ -3627,9 +3633,7 @@ def test_create_job_copy_config(self): } } - self._create_job_helper( - configuration, "google.cloud.bigquery.client.Client.copy_table", - ) + self._create_job_helper(configuration) def test_create_job_extract_config(self): configuration = { @@ -3642,9 +3646,7 @@ def test_create_job_extract_config(self): "destinationUris": ["gs://test_bucket/dst_object*"], } } - self._create_job_helper( - configuration, "google.cloud.bigquery.client.Client.extract_table", - ) + self._create_job_helper(configuration) def test_create_job_extract_config_for_model(self): configuration = { @@ -3657,17 +3659,17 @@ def test_create_job_extract_config_for_model(self): "destinationUris": ["gs://test_bucket/dst_object*"], } } - self._create_job_helper( - configuration, "google.cloud.bigquery.client.Client.extract_table", - ) + self._create_job_helper(configuration) def test_create_job_query_config(self): configuration = { - "query": {"query": "query", "destinationTable": {"tableId": "table_id"}} + "query": { + "query": "query", + "destinationTable": {"tableId": "table_id"}, + "useLegacySql": False, + } } - self._create_job_helper( - configuration, "google.cloud.bigquery.client.Client.query", - ) + self._create_job_helper(configuration) def test_create_job_query_config_w_rateLimitExceeded_error(self): from google.cloud.exceptions import Forbidden From 14bbe0175bc24d6572b5bed6f9d7853b85fa2ee6 Mon Sep 17 00:00:00 2001 From: HemangChothani Date: Mon, 12 Oct 2020 20:16:58 +0530 Subject: [PATCH 3/6] fix: fix sourceTable thing --- google/cloud/bigquery/client.py | 2 ++ google/cloud/bigquery/job.py | 11 +++++++++-- tests/unit/test_client.py | 18 ++++++++++++++++++ 3 files changed, 29 insertions(+), 2 deletions(-) diff --git a/google/cloud/bigquery/client.py b/google/cloud/bigquery/client.py index fa08d1c2b..07a7bd989 100644 --- a/google/cloud/bigquery/client.py +++ b/google/cloud/bigquery/client.py @@ -1629,6 +1629,8 @@ def create_job(self, job_config, retry=DEFAULT_RETRY): destination = TableReference.from_api_repr(destination) sources = [] source_configs = _get_sub_prop(job_config, ["copy", "sourceTables"]) + if source_configs is None: + source_configs = [_get_sub_prop(job_config, ["copy", "sourceTable"])] for source_config in source_configs: table_ref = TableReference.from_api_repr(source_config) sources.append(table_ref) diff --git a/google/cloud/bigquery/job.py b/google/cloud/bigquery/job.py index 20bce597a..7a28af653 100644 --- a/google/cloud/bigquery/job.py +++ b/google/cloud/bigquery/job.py @@ -1860,7 +1860,6 @@ def destination_encryption_configuration(self): def to_api_repr(self): """Generate a resource for :meth:`_begin`.""" - source_refs = [ { "projectId": table.project, @@ -1871,7 +1870,15 @@ def to_api_repr(self): ] configuration = self._configuration.to_api_repr() - _helpers._set_sub_prop(configuration, ["copy", "sourceTables"], source_refs) + source_table = _helpers._get_sub_prop( + self._configuration._properties, ["copy", "sourceTable"] + ) + if source_table: + _helpers._set_sub_prop( + configuration, ["copy", "sourceTable"], source_refs[0] + ) + else: + _helpers._set_sub_prop(configuration, ["copy", "sourceTables"], source_refs) _helpers._set_sub_prop( configuration, ["copy", "destinationTable"], diff --git a/tests/unit/test_client.py b/tests/unit/test_client.py index f0a6810d9..cf2a0585f 100644 --- a/tests/unit/test_client.py +++ b/tests/unit/test_client.py @@ -3635,6 +3635,24 @@ def test_create_job_copy_config(self): self._create_job_helper(configuration) + def test_create_job_copy_config_w_single_source(self): + configuration = { + "copy": { + "sourceTable": { + "projectId": self.PROJECT, + "datasetId": self.DS_ID, + "tableId": "source_table", + }, + "destinationTable": { + "projectId": self.PROJECT, + "datasetId": self.DS_ID, + "tableId": "destination_table", + }, + } + } + + self._create_job_helper(configuration) + def test_create_job_extract_config(self): configuration = { "extract": { From f94998face9305b8aec0a06fe07dc615f6b5e8d1 Mon Sep 17 00:00:00 2001 From: HemangChothani Date: Tue, 13 Oct 2020 14:48:20 +0530 Subject: [PATCH 4/6] fix: handle sourceTable passed in job resource --- google/cloud/bigquery/job.py | 8 +++----- tests/unit/test_client.py | 9 +++++++-- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/google/cloud/bigquery/job.py b/google/cloud/bigquery/job.py index 7a28af653..d5e11839a 100644 --- a/google/cloud/bigquery/job.py +++ b/google/cloud/bigquery/job.py @@ -1874,11 +1874,9 @@ def to_api_repr(self): self._configuration._properties, ["copy", "sourceTable"] ) if source_table: - _helpers._set_sub_prop( - configuration, ["copy", "sourceTable"], source_refs[0] - ) - else: - _helpers._set_sub_prop(configuration, ["copy", "sourceTables"], source_refs) + _helpers._del_sub_prop(configuration, ["copy", "sourceTable"]) + + _helpers._set_sub_prop(configuration, ["copy", "sourceTables"], source_refs) _helpers._set_sub_prop( configuration, ["copy", "destinationTable"], diff --git a/tests/unit/test_client.py b/tests/unit/test_client.py index cf2a0585f..bebe03233 100644 --- a/tests/unit/test_client.py +++ b/tests/unit/test_client.py @@ -3578,7 +3578,7 @@ def test_delete_table_w_not_found_ok_true(self): conn.api_request.assert_called_with(method="DELETE", path=path, timeout=None) def _create_job_helper(self, job_config): - from google.cloud.bigquery._helpers import _del_sub_prop + from google.cloud.bigquery import _helpers creds = _make_credentials() http = object() @@ -3591,8 +3591,13 @@ def _create_job_helper(self, job_config): conn = client._connection = make_connection(RESOURCE) client.create_job(job_config=job_config) + if "copy" in job_config: + if "sourceTable" in job_config["copy"]: + source = _helpers._get_sub_prop(job_config, ["copy", "sourceTable"]) + _helpers._del_sub_prop(job_config, ["copy", "sourceTable"]) + _helpers._set_sub_prop(job_config, ["copy", "sourceTables"], [source]) if "query" in job_config: - _del_sub_prop(job_config, ["query", "destinationTable"]) + _helpers._del_sub_prop(job_config, ["query", "destinationTable"]) conn.api_request.assert_called_once_with( method="POST", From e130f9af41291ada0003bad4164641b05f556076 Mon Sep 17 00:00:00 2001 From: HemangChothani Date: Mon, 19 Oct 2020 11:21:55 +0530 Subject: [PATCH 5/6] fix: remove delete destination table from query --- google/cloud/bigquery/client.py | 2 -- tests/unit/test_client.py | 19 +++---------------- 2 files changed, 3 insertions(+), 18 deletions(-) diff --git a/google/cloud/bigquery/client.py b/google/cloud/bigquery/client.py index fead08919..b31435eeb 100644 --- a/google/cloud/bigquery/client.py +++ b/google/cloud/bigquery/client.py @@ -52,7 +52,6 @@ from google.cloud.bigquery._helpers import _record_field_to_json from google.cloud.bigquery._helpers import _str_or_none from google.cloud.bigquery._helpers import _verify_job_config_type -from google.cloud.bigquery._helpers import _del_sub_prop from google.cloud.bigquery._http import Connection from google.cloud.bigquery import _pandas_helpers from google.cloud.bigquery.dataset import Dataset @@ -1658,7 +1657,6 @@ def create_job(self, job_config, retry=DEFAULT_RETRY): ) elif "query" in job_config: copy_config = copy.deepcopy(job_config) - _del_sub_prop(copy_config, ["query", "destinationTable"]) query_job_config = google.cloud.bigquery.job.QueryJobConfig.from_api_repr( copy_config ) diff --git a/tests/unit/test_client.py b/tests/unit/test_client.py index e8effc8a8..9cc9f44e3 100644 --- a/tests/unit/test_client.py +++ b/tests/unit/test_client.py @@ -3574,8 +3574,6 @@ def test_delete_table_w_not_found_ok_true(self): conn.api_request.assert_called_with(method="DELETE", path=path, timeout=None) def _create_job_helper(self, job_config): - from google.cloud.bigquery import _helpers - creds = _make_credentials() http = object() client = self._make_one(project=self.PROJECT, credentials=creds, _http=http) @@ -3587,14 +3585,6 @@ def _create_job_helper(self, job_config): conn = client._connection = make_connection(RESOURCE) client.create_job(job_config=job_config) - # if "copy" in job_config: - # if "sourceTable" in job_config["copy"]: - # source = _helpers._get_sub_prop(job_config, ["copy", "sourceTable"]) - # _helpers._del_sub_prop(job_config, ["copy", "sourceTable"]) - # _helpers._set_sub_prop(job_config, ["copy", "sourceTables"], [source]) - if "query" in job_config: - _helpers._del_sub_prop(job_config, ["query", "destinationTable"]) - conn.api_request.assert_called_once_with( method="POST", path="/projects/%s/jobs" % self.PROJECT, @@ -3716,9 +3706,9 @@ def test_create_job_query_config_w_rateLimitExceeded_error(self): } }, } - data_without_destination = { + data = { "jobReference": {"projectId": self.PROJECT, "jobId": mock.ANY}, - "configuration": {"query": {"query": query, "useLegacySql": False}}, + "configuration": configuration, } creds = _make_credentials() @@ -3745,10 +3735,7 @@ def test_create_job_query_config_w_rateLimitExceeded_error(self): self.assertEqual( fake_api_request.call_args_list[1], mock.call( - method="POST", - path="/projects/PROJECT/jobs", - data=data_without_destination, - timeout=None, + method="POST", path="/projects/PROJECT/jobs", data=data, timeout=None, ), ) From 4223b7828c87f55e7d0743fba2c532df420b8b2a Mon Sep 17 00:00:00 2001 From: HemangChothani Date: Mon, 19 Oct 2020 20:34:24 +0530 Subject: [PATCH 6/6] fix: revert destination table for query --- google/cloud/bigquery/client.py | 2 ++ tests/unit/test_client.py | 13 ++++++++++--- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/google/cloud/bigquery/client.py b/google/cloud/bigquery/client.py index b31435eeb..914379ea0 100644 --- a/google/cloud/bigquery/client.py +++ b/google/cloud/bigquery/client.py @@ -48,6 +48,7 @@ from google.cloud import exceptions from google.cloud.client import ClientWithProject +from google.cloud.bigquery._helpers import _del_sub_prop from google.cloud.bigquery._helpers import _get_sub_prop from google.cloud.bigquery._helpers import _record_field_to_json from google.cloud.bigquery._helpers import _str_or_none @@ -1657,6 +1658,7 @@ def create_job(self, job_config, retry=DEFAULT_RETRY): ) elif "query" in job_config: copy_config = copy.deepcopy(job_config) + _del_sub_prop(copy_config, ["query", "destinationTable"]) query_job_config = google.cloud.bigquery.job.QueryJobConfig.from_api_repr( copy_config ) diff --git a/tests/unit/test_client.py b/tests/unit/test_client.py index 9cc9f44e3..1a0ce3225 100644 --- a/tests/unit/test_client.py +++ b/tests/unit/test_client.py @@ -3574,6 +3574,8 @@ def test_delete_table_w_not_found_ok_true(self): conn.api_request.assert_called_with(method="DELETE", path=path, timeout=None) def _create_job_helper(self, job_config): + from google.cloud.bigquery import _helpers + creds = _make_credentials() http = object() client = self._make_one(project=self.PROJECT, credentials=creds, _http=http) @@ -3584,6 +3586,8 @@ def _create_job_helper(self, job_config): } conn = client._connection = make_connection(RESOURCE) client.create_job(job_config=job_config) + if "query" in job_config: + _helpers._del_sub_prop(job_config, ["query", "destinationTable"]) conn.api_request.assert_called_once_with( method="POST", @@ -3706,9 +3710,9 @@ def test_create_job_query_config_w_rateLimitExceeded_error(self): } }, } - data = { + data_without_destination = { "jobReference": {"projectId": self.PROJECT, "jobId": mock.ANY}, - "configuration": configuration, + "configuration": {"query": {"query": query, "useLegacySql": False}}, } creds = _make_credentials() @@ -3735,7 +3739,10 @@ def test_create_job_query_config_w_rateLimitExceeded_error(self): self.assertEqual( fake_api_request.call_args_list[1], mock.call( - method="POST", path="/projects/PROJECT/jobs", data=data, timeout=None, + method="POST", + path="/projects/PROJECT/jobs", + data=data_without_destination, + timeout=None, ), )