Skip to content

Commit

Permalink
feat: pass retry from Job.result() to Job.done() (#41)
Browse files Browse the repository at this point in the history
* feat(bigquery): pass retry from Job.result() to Job.done().

* fix merge conflicts

* drop the comment

* use kwargs sentinel

* check the mock retry

* update dependencies

* use kwargs pattern

* feat: added unit test for retry

* feat: added more exceptions

Co-authored-by: Tim Swast <[email protected]>
Co-authored-by: HemangChothani <[email protected]>
  • Loading branch information
3 people authored Nov 3, 2020
1 parent 8a8080b commit 284e17a
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 18 deletions.
16 changes: 7 additions & 9 deletions google/cloud/bigquery/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -819,8 +819,9 @@ def result(self, retry=DEFAULT_RETRY, timeout=None):
"""
if self.state is None:
self._begin(retry=retry, timeout=timeout)
# TODO: modify PollingFuture so it can pass a retry argument to done().
return super(_AsyncJob, self).result(timeout=timeout)

kwargs = {} if retry is DEFAULT_RETRY else {"retry": retry}
return super(_AsyncJob, self).result(timeout=timeout, **kwargs)

def cancelled(self):
"""Check if the job has been cancelled.
Expand Down Expand Up @@ -1845,7 +1846,7 @@ def destination(self):
"""
return TableReference.from_api_repr(
_helpers._get_sub_prop(
self._properties, ["configuration", "copy", "destinationTable"],
self._properties, ["configuration", "copy", "destinationTable"]
)
)

Expand Down Expand Up @@ -2043,10 +2044,7 @@ def __init__(self, job_id, source, destination_uris, client, job_config=None):
self._configuration = job_config

if source:
source_ref = {
"projectId": source.project,
"datasetId": source.dataset_id,
}
source_ref = {"projectId": source.project, "datasetId": source.dataset_id}

if isinstance(source, (Table, TableListItem, TableReference)):
source_ref["tableId"] = source.table_id
Expand Down Expand Up @@ -3138,10 +3136,10 @@ def done(self, retry=DEFAULT_RETRY, timeout=None, reload=True):

return self.state == _DONE_STATE

def _blocking_poll(self, timeout=None):
def _blocking_poll(self, timeout=None, **kwargs):
self._done_timeout = timeout
self._transport_timeout = timeout
super(QueryJob, self)._blocking_poll(timeout=timeout)
super(QueryJob, self)._blocking_poll(timeout=timeout, **kwargs)

@staticmethod
def _format_for_exception(query, job_id):
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
# 'Development Status :: 5 - Production/Stable'
release_status = "Development Status :: 5 - Production/Stable"
dependencies = [
"google-api-core[grpc] >= 1.22.2, < 2.0.0dev",
"google-api-core[grpc] >= 1.23.0, < 2.0.0dev",
"proto-plus >= 1.10.0",
"google-cloud-core >= 1.4.1, < 2.0dev",
"google-resumable-media >= 0.6.0, < 2.0dev",
Expand Down
2 changes: 1 addition & 1 deletion testing/constraints-3.6.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
google-api-core==1.22.2
google-api-core==1.23.0
google-cloud-bigquery-storage==2.0.0
google-cloud-core==1.4.1
google-resumable-media==0.6.0
Expand Down
67 changes: 60 additions & 7 deletions tests/unit/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -864,7 +864,7 @@ def test_cancel_w_custom_retry(self):
job = self._set_properties_job()

api_request_patcher = mock.patch.object(
job._client._connection, "api_request", side_effect=[ValueError, response],
job._client._connection, "api_request", side_effect=[ValueError, response]
)
retry = DEFAULT_RETRY.with_deadline(1).with_predicate(
lambda exc: isinstance(exc, ValueError)
Expand All @@ -885,7 +885,7 @@ def test_cancel_w_custom_retry(self):
[
mock.call(method="POST", path=api_path, query_params={}, timeout=7.5),
mock.call(
method="POST", path=api_path, query_params={}, timeout=7.5,
method="POST", path=api_path, query_params={}, timeout=7.5
), # was retried once
],
)
Expand Down Expand Up @@ -1034,7 +1034,6 @@ def test_result_w_retry_wo_state(self):
custom_predicate = mock.Mock()
custom_predicate.return_value = True
custom_retry = google.api_core.retry.Retry(predicate=custom_predicate)

self.assertIs(job.result(retry=custom_retry), job)

begin_call = mock.call(
Expand Down Expand Up @@ -2757,7 +2756,7 @@ def test_cancel_w_bound_client(self):
final_attributes.assert_called_with({"path": PATH}, client, job)

conn.api_request.assert_called_once_with(
method="POST", path=PATH, query_params={}, timeout=None,
method="POST", path=PATH, query_params={}, timeout=None
)
self._verifyResourceProperties(job, RESOURCE)

Expand All @@ -2779,7 +2778,7 @@ def test_cancel_w_alternate_client(self):

conn1.api_request.assert_not_called()
conn2.api_request.assert_called_once_with(
method="POST", path=PATH, query_params={}, timeout=None,
method="POST", path=PATH, query_params={}, timeout=None
)
self._verifyResourceProperties(job, RESOURCE)

Expand Down Expand Up @@ -3205,7 +3204,7 @@ def test_exists_miss_w_bound_client(self):
final_attributes.assert_called_with({"path": PATH}, client, job)

conn.api_request.assert_called_once_with(
method="GET", path=PATH, query_params={"fields": "id"}, timeout=None,
method="GET", path=PATH, query_params={"fields": "id"}, timeout=None
)

def test_exists_hit_w_alternate_client(self):
Expand Down Expand Up @@ -3620,7 +3619,7 @@ def test_exists_miss_w_bound_client(self):
final_attributes.assert_called_with({"path": PATH}, client, job)

conn.api_request.assert_called_once_with(
method="GET", path=PATH, query_params={"fields": "id"}, timeout=None,
method="GET", path=PATH, query_params={"fields": "id"}, timeout=None
)

def test_exists_hit_w_alternate_client(self):
Expand Down Expand Up @@ -4812,6 +4811,60 @@ def test_result_with_max_results(self):
tabledata_list_request[1]["query_params"]["maxResults"], max_results
)

def test_result_w_retry(self):
from google.cloud.bigquery.table import RowIterator

query_resource = {
"jobComplete": False,
"jobReference": {"projectId": self.PROJECT, "jobId": self.JOB_ID},
}
query_resource_done = {
"jobComplete": True,
"jobReference": {"projectId": self.PROJECT, "jobId": self.JOB_ID},
"schema": {"fields": [{"name": "col1", "type": "STRING"}]},
"totalRows": "2",
}
job_resource = self._make_resource(started=True)
job_resource_done = self._make_resource(started=True, ended=True)
job_resource_done["configuration"]["query"]["destinationTable"] = {
"projectId": "dest-project",
"datasetId": "dest_dataset",
"tableId": "dest_table",
}

connection = _make_connection(
exceptions.NotFound("not normally retriable"),
query_resource,
exceptions.NotFound("not normally retriable"),
query_resource_done,
exceptions.NotFound("not normally retriable"),
job_resource_done,
)
client = _make_client(self.PROJECT, connection=connection)
job = self._get_target_class().from_api_repr(job_resource, client)

custom_predicate = mock.Mock()
custom_predicate.return_value = True
custom_retry = google.api_core.retry.Retry(predicate=custom_predicate)

self.assertIsInstance(job.result(retry=custom_retry), RowIterator)
query_results_call = mock.call(
method="GET",
path=f"/projects/{self.PROJECT}/queries/{self.JOB_ID}",
query_params={"maxResults": 0},
timeout=None,
)
reload_call = mock.call(
method="GET",
path=f"/projects/{self.PROJECT}/jobs/{self.JOB_ID}",
query_params={},
timeout=None,
)

connection.api_request.assert_has_calls(
[query_results_call, query_results_call, reload_call]
)

def test_result_w_empty_schema(self):
from google.cloud.bigquery.table import _EmptyRowIterator

Expand Down

0 comments on commit 284e17a

Please sign in to comment.