Skip to content

Commit

Permalink
feat: add reload argument to *Job.done() functions (#341)
Browse files Browse the repository at this point in the history
This enables checking the job status without making an API call.

It also fixes an inconsistency in `QueryJob`, where a job can be
reported as "done" without having the results of a `getQueryResults` API
call.

Follow-up to #340
  • Loading branch information
tswast authored Oct 28, 2020
1 parent 5a925ec commit e51fd45
Show file tree
Hide file tree
Showing 3 changed files with 218 additions and 123 deletions.
67 changes: 36 additions & 31 deletions google/cloud/bigquery/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -767,7 +767,7 @@ def _set_future_result(self):
# set, do not call set_result/set_exception again.
# Note: self._result_set is set to True in set_result and
# set_exception, in case those methods are invoked directly.
if self.state != _DONE_STATE or self._result_set:
if not self.done(reload=False) or self._result_set:
return

if self.error_result is not None:
Expand All @@ -776,21 +776,24 @@ def _set_future_result(self):
else:
self.set_result(self)

def done(self, retry=DEFAULT_RETRY, timeout=None):
"""Refresh the job and checks if it is complete.
def done(self, retry=DEFAULT_RETRY, timeout=None, reload=True):
"""Checks if the job is complete.
Args:
retry (Optional[google.api_core.retry.Retry]): How to retry the RPC.
timeout (Optional[float]):
The number of seconds to wait for the underlying HTTP transport
before using ``retry``.
reload (Optional[bool]):
If ``True``, make an API call to refresh the job state of
unfinished jobs before checking. Default ``True``.
Returns:
bool: True if the job is complete, False otherwise.
"""
# Do not refresh is the state is already done, as the job will not
# change once complete.
if self.state != _DONE_STATE:
if self.state != _DONE_STATE and reload:
self.reload(retry=retry, timeout=timeout)
return self.state == _DONE_STATE

Expand Down Expand Up @@ -3073,7 +3076,7 @@ def estimated_bytes_processed(self):
result = int(result)
return result

def done(self, retry=DEFAULT_RETRY, timeout=None):
def done(self, retry=DEFAULT_RETRY, timeout=None, reload=True):
"""Refresh the job and checks if it is complete.
Args:
Expand All @@ -3082,10 +3085,25 @@ def done(self, retry=DEFAULT_RETRY, timeout=None):
timeout (Optional[float]):
The number of seconds to wait for the underlying HTTP transport
before using ``retry``.
reload (Optional[bool]):
If ``True``, make an API call to refresh the job state of
unfinished jobs before checking. Default ``True``.
Returns:
bool: True if the job is complete, False otherwise.
"""
is_done = (
# Only consider a QueryJob complete when we know we have the final
# query results available.
self._query_results is not None
and self._query_results.complete
and self.state == _DONE_STATE
)
# Do not refresh if the state is already done, as the job will not
# change once complete.
if not reload or is_done:
return is_done

# Since the API to getQueryResults can hang up to the timeout value
# (default of 10 seconds), set the timeout parameter to ensure that
# the timeout from the futures API is respected. See:
Expand All @@ -3103,23 +3121,20 @@ def done(self, retry=DEFAULT_RETRY, timeout=None):
# stored in _blocking_poll() in the process of polling for job completion.
transport_timeout = timeout if timeout is not None else self._transport_timeout

# Do not refresh if the state is already done, as the job will not
# change once complete.
if self.state != _DONE_STATE:
self._query_results = self._client._get_query_results(
self.job_id,
retry,
project=self.project,
timeout_ms=timeout_ms,
location=self.location,
timeout=transport_timeout,
)
self._query_results = self._client._get_query_results(
self.job_id,
retry,
project=self.project,
timeout_ms=timeout_ms,
location=self.location,
timeout=transport_timeout,
)

# Only reload the job once we know the query is complete.
# This will ensure that fields such as the destination table are
# correctly populated.
if self._query_results.complete:
self.reload(retry=retry, timeout=transport_timeout)
# Only reload the job once we know the query is complete.
# This will ensure that fields such as the destination table are
# correctly populated.
if self._query_results.complete and self.state != _DONE_STATE:
self.reload(retry=retry, timeout=transport_timeout)

return self.state == _DONE_STATE

Expand Down Expand Up @@ -3231,16 +3246,6 @@ def result(
"""
try:
super(QueryJob, self).result(retry=retry, timeout=timeout)

# Return an iterator instead of returning the job.
if not self._query_results:
self._query_results = self._client._get_query_results(
self.job_id,
retry,
project=self.project,
location=self.location,
timeout=timeout,
)
except exceptions.GoogleCloudError as exc:
exc.message += self._format_for_exception(self.query, self.job_id)
exc.query_job = self
Expand Down
101 changes: 90 additions & 11 deletions tests/unit/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
except (ImportError, AttributeError): # pragma: NO COVER
tqdm = None

import google.cloud.bigquery.query


def _make_credentials():
import google.auth.credentials
Expand Down Expand Up @@ -3942,10 +3944,6 @@ def _make_resource(self, started=False, ended=False):
resource = super(TestQueryJob, self)._make_resource(started, ended)
config = resource["configuration"]["query"]
config["query"] = self.QUERY

if ended:
resource["status"] = {"state": "DONE"}

return resource

def _verifyBooleanResourceProperties(self, job, config):
Expand Down Expand Up @@ -4211,6 +4209,9 @@ def test_done(self):
client = _make_client(project=self.PROJECT)
resource = self._make_resource(ended=True)
job = self._get_target_class().from_api_repr(resource, client)
job._query_results = google.cloud.bigquery.query._QueryResults.from_api_repr(
{"jobComplete": True, "jobReference": resource["jobReference"]}
)
self.assertTrue(job.done())

def test_done_w_timeout(self):
Expand Down Expand Up @@ -4668,35 +4669,110 @@ def test_result(self):
from google.cloud.bigquery.table import RowIterator

query_resource = {
"jobComplete": False,
"jobReference": {"projectId": self.PROJECT, "jobId": self.JOB_ID},
}
query_resource_done = {
"jobComplete": True,
"jobReference": {"projectId": self.PROJECT, "jobId": self.JOB_ID},
"schema": {"fields": [{"name": "col1", "type": "STRING"}]},
"totalRows": "2",
}
job_resource = self._make_resource(started=True)
job_resource_done = self._make_resource(started=True, ended=True)
job_resource_done["configuration"]["query"]["destinationTable"] = {
"projectId": "dest-project",
"datasetId": "dest_dataset",
"tableId": "dest_table",
}
tabledata_resource = {
# Explicitly set totalRows to be different from the query response.
# to test update during iteration.
# Explicitly set totalRows to be different from the initial
# response to test update during iteration.
"totalRows": "1",
"pageToken": None,
"rows": [{"f": [{"v": "abc"}]}],
}
connection = _make_connection(query_resource, tabledata_resource)
client = _make_client(self.PROJECT, connection=connection)
resource = self._make_resource(ended=True)
job = self._get_target_class().from_api_repr(resource, client)
conn = _make_connection(
query_resource, query_resource_done, job_resource_done, tabledata_resource
)
client = _make_client(self.PROJECT, connection=conn)
job = self._get_target_class().from_api_repr(job_resource, client)

result = job.result()

self.assertIsInstance(result, RowIterator)
self.assertEqual(result.total_rows, 2)

rows = list(result)
self.assertEqual(len(rows), 1)
self.assertEqual(rows[0].col1, "abc")
# Test that the total_rows property has changed during iteration, based
# on the response from tabledata.list.
self.assertEqual(result.total_rows, 1)

query_results_call = mock.call(
method="GET",
path=f"/projects/{self.PROJECT}/queries/{self.JOB_ID}",
query_params={"maxResults": 0},
timeout=None,
)
reload_call = mock.call(
method="GET",
path=f"/projects/{self.PROJECT}/jobs/{self.JOB_ID}",
query_params={},
timeout=None,
)
tabledata_call = mock.call(
method="GET",
path="/projects/dest-project/datasets/dest_dataset/tables/dest_table/data",
query_params={},
timeout=None,
)
conn.api_request.assert_has_calls(
[query_results_call, query_results_call, reload_call, tabledata_call]
)

def test_result_with_done_job_calls_get_query_results(self):
query_resource_done = {
"jobComplete": True,
"jobReference": {"projectId": self.PROJECT, "jobId": self.JOB_ID},
"schema": {"fields": [{"name": "col1", "type": "STRING"}]},
"totalRows": "1",
}
job_resource = self._make_resource(started=True, ended=True)
job_resource["configuration"]["query"]["destinationTable"] = {
"projectId": "dest-project",
"datasetId": "dest_dataset",
"tableId": "dest_table",
}
tabledata_resource = {
"totalRows": "1",
"pageToken": None,
"rows": [{"f": [{"v": "abc"}]}],
}
conn = _make_connection(query_resource_done, tabledata_resource)
client = _make_client(self.PROJECT, connection=conn)
job = self._get_target_class().from_api_repr(job_resource, client)

result = job.result()

rows = list(result)
self.assertEqual(len(rows), 1)
self.assertEqual(rows[0].col1, "abc")

query_results_call = mock.call(
method="GET",
path=f"/projects/{self.PROJECT}/queries/{self.JOB_ID}",
query_params={"maxResults": 0},
timeout=None,
)
tabledata_call = mock.call(
method="GET",
path="/projects/dest-project/datasets/dest_dataset/tables/dest_table/data",
query_params={},
timeout=None,
)
conn.api_request.assert_has_calls([query_results_call, tabledata_call])

def test_result_with_max_results(self):
from google.cloud.bigquery.table import RowIterator

Expand Down Expand Up @@ -4938,6 +5014,9 @@ def test_result_error(self):
"errors": [error_result],
"state": "DONE",
}
job._query_results = google.cloud.bigquery.query._QueryResults.from_api_repr(
{"jobComplete": True, "jobReference": job._properties["jobReference"]}
)
job._set_future_result()

with self.assertRaises(exceptions.GoogleCloudError) as exc_info:
Expand Down
Loading

0 comments on commit e51fd45

Please sign in to comment.