From a77538af24802ee3d78b2b6eae5322e10355d444 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Wed, 16 Aug 2017 17:10:06 -0700 Subject: [PATCH] BigQuery - add get_query_results method. This method calls the getQueryResults API directly and returns a QueryResults object. Note: the response from this API does not include the query, so I modified the constructor to make query optional in this case. --- bigquery/google/cloud/bigquery/client.py | 35 +++++++++++++ bigquery/google/cloud/bigquery/query.py | 6 +++ bigquery/tests/system.py | 7 +++ bigquery/tests/unit/test_client.py | 63 ++++++++++++++++++++++++ 4 files changed, 111 insertions(+) diff --git a/bigquery/google/cloud/bigquery/client.py b/bigquery/google/cloud/bigquery/client.py index d9ff17d71720..52c462240097 100644 --- a/bigquery/google/cloud/bigquery/client.py +++ b/bigquery/google/cloud/bigquery/client.py @@ -162,6 +162,41 @@ def dataset(self, dataset_name, project=None): """ return Dataset(dataset_name, client=self, project=project) + def get_query_results(self, job_id, project=None, timeout_ms=None): + """Get the query results object for a query job. + + :type job_id: str + :param job_id: Name of the query job. + + :type project: str + :param project: + (Optional) project ID for the query job (defaults to the project of + the client). + + :type timeout_ms: int + :param timeout_ms: + (Optional) number of milliseconds the the API call should wait for + the query to complete before the request times out. + + :rtype: :class:`google.cloud.bigquery.query.QueryResults` + :returns: a new ``QueryResults`` instance + """ + + extra_params = {'maxResults': 0} + + if project is None: + project = self.project + + if timeout_ms is not None: + extra_params['timeoutMs'] = timeout_ms + + path = '/projects/{}/queries/{}'.format(project, job_id) + + resource = self._connection.api_request( + method='GET', path=path, query_params=extra_params) + + return QueryResults.from_api_repr(resource, self) + def job_from_resource(self, resource): """Detect correct job type from resource and instantiate. diff --git a/bigquery/google/cloud/bigquery/query.py b/bigquery/google/cloud/bigquery/query.py index dfa0a422a68a..c01017af0d30 100644 --- a/bigquery/google/cloud/bigquery/query.py +++ b/bigquery/google/cloud/bigquery/query.py @@ -76,6 +76,12 @@ def __init__(self, query, client, udf_resources=(), query_parameters=()): self.query_parameters = query_parameters self._job = None + @classmethod + def from_api_repr(cls, api_response, client): + instance = cls(None, client) + instance._set_properties(api_response) + return instance + @classmethod def from_query_job(cls, job): """Factory: construct from an existing job. diff --git a/bigquery/tests/system.py b/bigquery/tests/system.py index 5d0b38ffac41..3cff1b001731 100644 --- a/bigquery/tests/system.py +++ b/bigquery/tests/system.py @@ -599,6 +599,13 @@ def test_job_cancel(self): # raise an error, and that the job completed (in the `retry()` # above). + def test_get_query_results(self): + job_id = 'test-get-query-results-' + str(uuid.uuid4()) + query_job = Config.CLIENT.run_async_query(job_id, 'SELECT 1') + query_job.begin() + results = Config.CLIENT.get_query_results(job_id) + self.assertEqual(results.total_rows, 1) + def test_sync_query_w_legacy_sql_types(self): naive = datetime.datetime(2016, 12, 5, 12, 41, 9) stamp = '%s %s' % (naive.date().isoformat(), naive.time().isoformat()) diff --git a/bigquery/tests/unit/test_client.py b/bigquery/tests/unit/test_client.py index e71f3b99fbe0..33cd59513efc 100644 --- a/bigquery/tests/unit/test_client.py +++ b/bigquery/tests/unit/test_client.py @@ -45,6 +45,64 @@ def test_ctor(self): self.assertIs(client._connection.credentials, creds) self.assertIs(client._connection.http, http) + def test_get_job_miss_w_explicit_project_and_timeout(self): + from google.cloud.exceptions import NotFound + + project = 'PROJECT' + creds = _make_credentials() + client = self._make_one(project, creds) + conn = client._connection = _Connection() + + with self.assertRaises(NotFound): + client.get_query_results( + 'nothere', project='other-project', timeout_ms=500) + + self.assertEqual(len(conn._requested), 1) + req = conn._requested[0] + self.assertEqual(req['method'], 'GET') + self.assertEqual( + req['path'], '/projects/other-project/queries/nothere') + self.assertEqual( + req['query_params'], {'maxResults': 0, 'timeoutMs': 500}) + + def test_get_query_results_hit(self): + project = 'PROJECT' + job_id = 'query_job' + data = { + 'kind': 'bigquery#getQueryResultsResponse', + 'etag': 'some-tag', + 'schema': { + 'fields': [ + { + 'name': 'title', + 'type': 'STRING', + 'mode': 'NULLABLE' + }, + { + 'name': 'unique_words', + 'type': 'INTEGER', + 'mode': 'NULLABLE' + } + ] + }, + 'jobReference': { + 'projectId': project, + 'jobId': job_id, + }, + 'totalRows': '10', + 'totalBytesProcessed': '2464625', + 'jobComplete': True, + 'cacheHit': False, + } + + creds = _make_credentials() + client = self._make_one(project, creds) + client._connection = _Connection(data) + query_results = client.get_query_results(job_id) + + self.assertEqual(query_results.total_rows, 10) + self.assertTrue(query_results.complete) + def test_list_projects_defaults(self): import six from google.cloud.bigquery.client import Project @@ -607,6 +665,11 @@ def __init__(self, *responses): self._requested = [] def api_request(self, **kw): + from google.cloud.exceptions import NotFound self._requested.append(kw) + + if len(self._responses) == 0: + raise NotFound('miss') + response, self._responses = self._responses[0], self._responses[1:] return response