diff --git a/google/cloud/bigquery/_helpers.py b/google/cloud/bigquery/_helpers.py index 668b4ca3d..5ee5e1850 100644 --- a/google/cloud/bigquery/_helpers.py +++ b/google/cloud/bigquery/_helpers.py @@ -33,6 +33,8 @@ from google.auth import credentials as ga_credentials # type: ignore from google.api_core import client_options as client_options_lib +TimeoutType = Union[float, None] + _RFC3339_MICROS_NO_ZULU = "%Y-%m-%dT%H:%M:%S.%f" _TIMEONLY_WO_MICROS = "%H:%M:%S" _TIMEONLY_W_MICROS = "%H:%M:%S.%f" diff --git a/google/cloud/bigquery/_job_helpers.py b/google/cloud/bigquery/_job_helpers.py index 290439394..e66ab2763 100644 --- a/google/cloud/bigquery/_job_helpers.py +++ b/google/cloud/bigquery/_job_helpers.py @@ -39,7 +39,7 @@ import functools import os import uuid -from typing import Any, Dict, TYPE_CHECKING, Optional +from typing import Any, Dict, Optional, TYPE_CHECKING, Union import google.api_core.exceptions as core_exceptions from google.api_core import retry as retries @@ -47,6 +47,7 @@ from google.cloud.bigquery import job import google.cloud.bigquery.query from google.cloud.bigquery import table +from google.cloud.bigquery.retry import POLLING_DEFAULT_VALUE # Avoid circular imports if TYPE_CHECKING: # pragma: NO COVER @@ -328,7 +329,7 @@ def query_and_wait( location: Optional[str], project: str, api_timeout: Optional[float] = None, - wait_timeout: Optional[float] = None, + wait_timeout: Optional[Union[float, object]] = POLLING_DEFAULT_VALUE, retry: Optional[retries.Retry], job_retry: Optional[retries.Retry], page_size: Optional[int] = None, @@ -364,10 +365,12 @@ def query_and_wait( api_timeout (Optional[float]): The number of seconds to wait for the underlying HTTP transport before using ``retry``. - wait_timeout (Optional[float]): + wait_timeout (Optional[Union[float, object]]): The number of seconds to wait for the query to finish. If the query doesn't finish before this timeout, the client attempts - to cancel the query. + to cancel the query. If unset, the underlying Client.get_job() API + call has timeout, but we still wait indefinitely for the job to + finish. retry (Optional[google.api_core.retry.Retry]): How to retry the RPC. This only applies to making RPC calls. It isn't used to retry failed jobs. This has @@ -545,7 +548,7 @@ def _supported_by_jobs_query(request_body: Dict[str, Any]) -> bool: def _wait_or_cancel( job: job.QueryJob, api_timeout: Optional[float], - wait_timeout: Optional[float], + wait_timeout: Optional[Union[object, float]], retry: Optional[retries.Retry], page_size: Optional[int], max_results: Optional[int], diff --git a/google/cloud/bigquery/client.py b/google/cloud/bigquery/client.py index 891a54e5c..4234767fe 100644 --- a/google/cloud/bigquery/client.py +++ b/google/cloud/bigquery/client.py @@ -82,6 +82,7 @@ from google.cloud.bigquery._helpers import _DEFAULT_UNIVERSE from google.cloud.bigquery._helpers import _validate_universe from google.cloud.bigquery._helpers import _get_client_universe +from google.cloud.bigquery._helpers import TimeoutType from google.cloud.bigquery._job_helpers import make_job_id as _make_job_id from google.cloud.bigquery.dataset import Dataset from google.cloud.bigquery.dataset import DatasetListItem @@ -107,6 +108,7 @@ DEFAULT_JOB_RETRY, DEFAULT_RETRY, DEFAULT_TIMEOUT, + DEFAULT_GET_JOB_TIMEOUT, ) from google.cloud.bigquery.routine import Routine from google.cloud.bigquery.routine import RoutineReference @@ -123,7 +125,6 @@ _versions_helpers.PANDAS_VERSIONS.try_import() ) # mypy check fails because pandas import is outside module, there are type: ignore comments related to this -TimeoutType = Union[float, None] ResumableTimeoutType = Union[ None, float, Tuple[float, float] ] # for resumable media methods @@ -2139,7 +2140,7 @@ def get_job( project: Optional[str] = None, location: Optional[str] = None, retry: retries.Retry = DEFAULT_RETRY, - timeout: TimeoutType = DEFAULT_TIMEOUT, + timeout: TimeoutType = DEFAULT_GET_JOB_TIMEOUT, ) -> Union[job.LoadJob, job.CopyJob, job.ExtractJob, job.QueryJob, job.UnknownJob]: """Fetch a job for the project associated with this client. diff --git a/google/cloud/bigquery/job/base.py b/google/cloud/bigquery/job/base.py index 2641afea8..6f9726181 100644 --- a/google/cloud/bigquery/job/base.py +++ b/google/cloud/bigquery/job/base.py @@ -26,8 +26,11 @@ import google.api_core.future.polling from google.cloud.bigquery import _helpers -from google.cloud.bigquery.retry import DEFAULT_RETRY from google.cloud.bigquery._helpers import _int_or_none +from google.cloud.bigquery.retry import ( + DEFAULT_GET_JOB_TIMEOUT, + DEFAULT_RETRY, +) _DONE_STATE = "DONE" @@ -801,7 +804,7 @@ def reload( self, client=None, retry: "retries.Retry" = DEFAULT_RETRY, - timeout: Optional[float] = None, + timeout: Optional[float] = DEFAULT_GET_JOB_TIMEOUT, ): """API call: refresh job properties via a GET request. @@ -820,22 +823,14 @@ def reload( """ client = self._require_client(client) - extra_params = {} - if self.location: - extra_params["location"] = self.location - span_attributes = {"path": self.path} - - api_response = client._call_api( - retry, - span_name="BigQuery.job.reload", - span_attributes=span_attributes, - job_ref=self, - method="GET", - path=self.path, - query_params=extra_params, + got_job = client.get_job( + self, + project=self.project, + location=self.location, + retry=retry, timeout=timeout, ) - self._set_properties(api_response) + self._set_properties(got_job._properties) def cancel( self, @@ -913,7 +908,7 @@ def _set_future_result(self): def done( self, retry: "retries.Retry" = DEFAULT_RETRY, - timeout: Optional[float] = None, + timeout: Optional[float] = DEFAULT_GET_JOB_TIMEOUT, reload: bool = True, ) -> bool: """Checks if the job is complete. diff --git a/google/cloud/bigquery/job/query.py b/google/cloud/bigquery/job/query.py index 09a69e11c..25b89c3d7 100644 --- a/google/cloud/bigquery/job/query.py +++ b/google/cloud/bigquery/job/query.py @@ -40,7 +40,11 @@ StructQueryParameter, UDFResource, ) -from google.cloud.bigquery.retry import DEFAULT_RETRY, DEFAULT_JOB_RETRY +from google.cloud.bigquery.retry import ( + DEFAULT_RETRY, + DEFAULT_JOB_RETRY, + POLLING_DEFAULT_VALUE, +) from google.cloud.bigquery.routine import RoutineReference from google.cloud.bigquery.schema import SchemaField from google.cloud.bigquery.table import _EmptyRowIterator @@ -1437,7 +1441,7 @@ def result( # type: ignore # (incompatible with supertype) page_size: Optional[int] = None, max_results: Optional[int] = None, retry: Optional[retries.Retry] = DEFAULT_RETRY, - timeout: Optional[float] = None, + timeout: Optional[Union[float, object]] = POLLING_DEFAULT_VALUE, start_index: Optional[int] = None, job_retry: Optional[retries.Retry] = DEFAULT_JOB_RETRY, ) -> Union["RowIterator", _EmptyRowIterator]: @@ -1457,11 +1461,14 @@ def result( # type: ignore # (incompatible with supertype) is ``DONE``, retrying is aborted early even if the results are not available, as this will not change anymore. - timeout (Optional[float]): + timeout (Optional[Union[float, \ + google.api_core.future.polling.PollingFuture._DEFAULT_VALUE, \ + ]]): The number of seconds to wait for the underlying HTTP transport - before using ``retry``. - If multiple requests are made under the hood, ``timeout`` - applies to each individual request. + before using ``retry``. If ``None``, wait indefinitely + unless an error is returned. If unset, only the + underlying API calls have their default timeouts, but we still + wait indefinitely for the job to finish. start_index (Optional[int]): The zero-based index of the starting row to read. job_retry (Optional[google.api_core.retry.Retry]): @@ -1507,6 +1514,13 @@ def result( # type: ignore # (incompatible with supertype) # Intentionally omit job_id and query_id since this doesn't # actually correspond to a finished query job. ) + + # When timeout has default sentinel value ``object()``, do not pass + # anything to invoke default timeouts in subsequent calls. + kwargs: Dict[str, Union[_helpers.TimeoutType, object]] = {} + if type(timeout) is not object: + kwargs["timeout"] = timeout + try: retry_do_query = getattr(self, "_retry_do_query", None) if retry_do_query is not None: @@ -1548,7 +1562,7 @@ def is_job_done(): # rateLimitExceeded errors are ambiguous. We want to know if # the query job failed and not just the call to # jobs.getQueryResults. - if self.done(retry=retry, timeout=timeout): + if self.done(retry=retry, **kwargs): # If it's already failed, we might as well stop. job_failed_exception = self.exception() if job_failed_exception is not None: @@ -1585,14 +1599,14 @@ def is_job_done(): # response from the REST API. This ensures we aren't # making any extra API calls if the previous loop # iteration fetched the finished job. - self._reload_query_results(retry=retry, timeout=timeout) + self._reload_query_results(retry=retry, **kwargs) return True # Call jobs.getQueryResults with max results set to 0 just to # wait for the query to finish. Unlike most methods, # jobs.getQueryResults hangs as long as it can to ensure we # know when the query has finished as soon as possible. - self._reload_query_results(retry=retry, timeout=timeout) + self._reload_query_results(retry=retry, **kwargs) # Even if the query is finished now according to # jobs.getQueryResults, we'll want to reload the job status if @@ -1682,10 +1696,10 @@ def is_job_done(): max_results=max_results, start_index=start_index, retry=retry, - timeout=timeout, query_id=self.query_id, first_page_response=first_page_response, num_dml_affected_rows=self._query_results.num_dml_affected_rows, + **kwargs, ) rows._preserve_order = _contains_order_by(self.query) return rows diff --git a/google/cloud/bigquery/retry.py b/google/cloud/bigquery/retry.py index 111034519..10958980d 100644 --- a/google/cloud/bigquery/retry.py +++ b/google/cloud/bigquery/retry.py @@ -14,6 +14,7 @@ from google.api_core import exceptions from google.api_core import retry +import google.api_core.future.polling from google.auth import exceptions as auth_exceptions # type: ignore import requests.exceptions @@ -140,3 +141,13 @@ def _job_should_retry(exc): """ The default job retry object. """ + +DEFAULT_GET_JOB_TIMEOUT = 128 +""" +Default timeout for Client.get_job(). +""" + +POLLING_DEFAULT_VALUE = google.api_core.future.polling.PollingFuture._DEFAULT_VALUE +""" +Default value defined in google.api_core.future.polling.PollingFuture. +""" diff --git a/tests/unit/job/test_base.py b/tests/unit/job/test_base.py index 186729529..a7337afd2 100644 --- a/tests/unit/job/test_base.py +++ b/tests/unit/job/test_base.py @@ -22,6 +22,8 @@ from google.api_core.future import polling import pytest +from google.cloud.bigquery.retry import DEFAULT_GET_JOB_TIMEOUT + from ..helpers import make_connection from .helpers import _make_client @@ -709,7 +711,7 @@ def test_exists_w_timeout(self): ) def test_reload_defaults(self): - from google.cloud.bigquery.retry import DEFAULT_RETRY + from google.cloud.bigquery.retry import DEFAULT_RETRY, DEFAULT_GET_JOB_TIMEOUT resource = { "jobReference": { @@ -729,15 +731,19 @@ def test_reload_defaults(self): call_api.assert_called_once_with( DEFAULT_RETRY, - span_name="BigQuery.job.reload", + span_name="BigQuery.getJob", span_attributes={ - "path": "/projects/{}/jobs/{}".format(self.PROJECT, self.JOB_ID) + "path": "/projects/{}/jobs/{}".format(self.PROJECT, self.JOB_ID), + "job_id": "job-id", + "location": "us-central", }, - job_ref=job, method="GET", path="/projects/{}/jobs/{}".format(self.PROJECT, self.JOB_ID), - query_params={"location": self.LOCATION}, - timeout=None, + query_params={ + "projection": "full", + "location": "us-central", + }, + timeout=DEFAULT_GET_JOB_TIMEOUT, ) self.assertEqual(job._properties, expected) @@ -764,18 +770,43 @@ def test_reload_explicit(self): call_api.assert_called_once_with( retry, - span_name="BigQuery.job.reload", + span_name="BigQuery.getJob", span_attributes={ - "path": "/projects/{}/jobs/{}".format(self.PROJECT, self.JOB_ID) + "path": "/projects/{}/jobs/{}".format(self.PROJECT, self.JOB_ID), + "job_id": "job-id", + "location": None, }, - job_ref=job, method="GET", path="/projects/{}/jobs/{}".format(self.PROJECT, self.JOB_ID), - query_params={}, + query_params={"projection": "full"}, timeout=4.2, ) self.assertEqual(job._properties, expected) + def test_reload_none_timeout(self): + from google.cloud.bigquery.retry import DEFAULT_RETRY + + resource = { + "jobReference": { + "jobId": self.JOB_ID, + "projectId": self.PROJECT, + "location": None, + }, + "configuration": {"test": True}, + } + client = _make_client(project=self.PROJECT) + conn = client._connection = make_connection(resource) + job = self._set_properties_job() + retry = DEFAULT_RETRY.with_deadline(1) + job.reload(client=client, retry=retry, timeout=None) + + conn.api_request.assert_called_once_with( + method="GET", + path="/projects/{}/jobs/{}".format(self.PROJECT, self.JOB_ID), + query_params={"projection": "full"}, + timeout=None, + ) + def test_cancel_defaults(self): resource = { "jobReference": { @@ -952,7 +983,10 @@ def test_done_defaults_wo_state(self): self.assertFalse(job.done()) - reload_.assert_called_once_with(retry=DEFAULT_RETRY, timeout=None) + reload_.assert_called_once_with( + retry=DEFAULT_RETRY, + timeout=DEFAULT_GET_JOB_TIMEOUT, + ) def test_done_explicit_wo_state(self): from google.cloud.bigquery.retry import DEFAULT_RETRY @@ -966,6 +1000,18 @@ def test_done_explicit_wo_state(self): reload_.assert_called_once_with(retry=retry, timeout=7.5) + def test_done_with_none_timeout(self): + from google.cloud.bigquery.retry import DEFAULT_RETRY + + client = _make_client(project=self.PROJECT) + job = self._make_one(self.JOB_ID, client) + reload_ = job.reload = mock.Mock() + retry = DEFAULT_RETRY.with_deadline(1) + + self.assertFalse(job.done(retry=retry, timeout=None)) + + reload_.assert_called_once_with(retry=retry, timeout=None) + def test_done_already(self): client = _make_client(project=self.PROJECT) job = self._make_one(self.JOB_ID, client) @@ -974,6 +1020,8 @@ def test_done_already(self): self.assertTrue(job.done()) def test_result_default_wo_state(self): + from google.cloud.bigquery.retry import DEFAULT_GET_JOB_TIMEOUT + begun_job_resource = _make_job_resource( job_id=self.JOB_ID, project_id=self.PROJECT, location="US", started=True ) @@ -1003,12 +1051,17 @@ def test_result_default_wo_state(self): reload_call = mock.call( method="GET", path=f"/projects/{self.PROJECT}/jobs/{self.JOB_ID}", - query_params={"location": "US"}, - timeout=None, + query_params={ + "projection": "full", + "location": "US", + }, + timeout=DEFAULT_GET_JOB_TIMEOUT, ) conn.api_request.assert_has_calls([begin_call, begin_call, reload_call]) def test_result_w_retry_wo_state(self): + from google.cloud.bigquery.retry import DEFAULT_GET_JOB_TIMEOUT + begun_job_resource = _make_job_resource( job_id=self.JOB_ID, project_id=self.PROJECT, location="EU", started=True ) @@ -1054,8 +1107,11 @@ def test_result_w_retry_wo_state(self): reload_call = mock.call( method="GET", path=f"/projects/{self.PROJECT}/jobs/{self.JOB_ID}", - query_params={"location": "EU"}, - timeout=None, + query_params={ + "projection": "full", + "location": "EU", + }, + timeout=DEFAULT_GET_JOB_TIMEOUT, ) conn.api_request.assert_has_calls( [begin_call, begin_call, reload_call, reload_call] diff --git a/tests/unit/job/test_copy.py b/tests/unit/job/test_copy.py index e1bb20db2..4b0945310 100644 --- a/tests/unit/job/test_copy.py +++ b/tests/unit/job/test_copy.py @@ -477,6 +477,8 @@ def test_exists_hit_w_alternate_client(self): ) def test_reload_w_bound_client(self): + from google.cloud.bigquery.retry import DEFAULT_GET_JOB_TIMEOUT + PATH = "/projects/%s/jobs/%s" % (self.PROJECT, self.JOB_ID) RESOURCE = self._make_resource() conn = make_connection(RESOURCE) @@ -489,14 +491,27 @@ def test_reload_w_bound_client(self): ) as final_attributes: job.reload() - final_attributes.assert_called_with({"path": PATH}, client, job) + final_attributes.assert_called_with( + { + "path": PATH, + "job_id": self.JOB_ID, + "location": None, + }, + client, + None, + ) conn.api_request.assert_called_once_with( - method="GET", path=PATH, query_params={}, timeout=None + method="GET", + path=PATH, + query_params={"projection": "full"}, + timeout=DEFAULT_GET_JOB_TIMEOUT, ) self._verifyResourceProperties(job, RESOURCE) def test_reload_w_alternate_client(self): + from google.cloud.bigquery.retry import DEFAULT_GET_JOB_TIMEOUT + PATH = "/projects/%s/jobs/%s" % (self.PROJECT, self.JOB_ID) RESOURCE = self._make_resource() conn1 = make_connection() @@ -511,10 +526,21 @@ def test_reload_w_alternate_client(self): ) as final_attributes: job.reload(client=client2) - final_attributes.assert_called_with({"path": PATH}, client2, job) + final_attributes.assert_called_with( + { + "path": PATH, + "job_id": self.JOB_ID, + "location": None, + }, + client2, + None, + ) conn1.api_request.assert_not_called() conn2.api_request.assert_called_once_with( - method="GET", path=PATH, query_params={}, timeout=None + method="GET", + path=PATH, + query_params={"projection": "full"}, + timeout=DEFAULT_GET_JOB_TIMEOUT, ) self._verifyResourceProperties(job, RESOURCE) diff --git a/tests/unit/job/test_extract.py b/tests/unit/job/test_extract.py index ee0d67d68..ebf9f09e6 100644 --- a/tests/unit/job/test_extract.py +++ b/tests/unit/job/test_extract.py @@ -399,6 +399,7 @@ def test_exists_hit_w_alternate_client(self): def test_reload_w_bound_client(self): from google.cloud.bigquery.dataset import DatasetReference + from google.cloud.bigquery.retry import DEFAULT_GET_JOB_TIMEOUT PATH = "/projects/%s/jobs/%s" % (self.PROJECT, self.JOB_ID) RESOURCE = self._make_resource() @@ -412,14 +413,26 @@ def test_reload_w_bound_client(self): ) as final_attributes: job.reload() - final_attributes.assert_called_with({"path": PATH}, client, job) + final_attributes.assert_called_with( + { + "path": PATH, + "job_id": self.JOB_ID, + "location": None, + }, + client, + None, + ) conn.api_request.assert_called_once_with( - method="GET", path=PATH, query_params={}, timeout=None + method="GET", + path=PATH, + query_params={"projection": "full"}, + timeout=DEFAULT_GET_JOB_TIMEOUT, ) self._verifyResourceProperties(job, RESOURCE) def test_reload_w_alternate_client(self): from google.cloud.bigquery.dataset import DatasetReference + from google.cloud.bigquery.retry import DEFAULT_GET_JOB_TIMEOUT PATH = "/projects/%s/jobs/%s" % (self.PROJECT, self.JOB_ID) RESOURCE = self._make_resource() @@ -435,10 +448,21 @@ def test_reload_w_alternate_client(self): ) as final_attributes: job.reload(client=client2) - final_attributes.assert_called_with({"path": PATH}, client2, job) + final_attributes.assert_called_with( + { + "path": PATH, + "job_id": self.JOB_ID, + "location": None, + }, + client2, + None, + ) conn1.api_request.assert_not_called() conn2.api_request.assert_called_once_with( - method="GET", path=PATH, query_params={}, timeout=None + method="GET", + path=PATH, + query_params={"projection": "full"}, + timeout=DEFAULT_GET_JOB_TIMEOUT, ) self._verifyResourceProperties(job, RESOURCE) diff --git a/tests/unit/job/test_load.py b/tests/unit/job/test_load.py index 976fec914..0fb044696 100644 --- a/tests/unit/job/test_load.py +++ b/tests/unit/job/test_load.py @@ -714,6 +714,8 @@ def test_exists_miss_w_job_reference(self): ) def test_reload_w_bound_client(self): + from google.cloud.bigquery.retry import DEFAULT_GET_JOB_TIMEOUT + PATH = "/projects/%s/jobs/%s" % (self.PROJECT, self.JOB_ID) RESOURCE = self._make_resource() conn = make_connection(RESOURCE) @@ -724,14 +726,27 @@ def test_reload_w_bound_client(self): ) as final_attributes: job.reload() - final_attributes.assert_called_with({"path": PATH}, client, job) + final_attributes.assert_called_with( + { + "path": PATH, + "job_id": self.JOB_ID, + "location": None, + }, + client, + None, + ) conn.api_request.assert_called_once_with( - method="GET", path=PATH, query_params={}, timeout=None + method="GET", + path=PATH, + query_params={"projection": "full"}, + timeout=DEFAULT_GET_JOB_TIMEOUT, ) self._verifyResourceProperties(job, RESOURCE) def test_reload_w_alternate_client(self): + from google.cloud.bigquery.retry import DEFAULT_GET_JOB_TIMEOUT + PATH = "/projects/%s/jobs/%s" % (self.PROJECT, self.JOB_ID) RESOURCE = self._make_resource() conn1 = make_connection() @@ -744,16 +759,28 @@ def test_reload_w_alternate_client(self): ) as final_attributes: job.reload(client=client2) - final_attributes.assert_called_with({"path": PATH}, client2, job) + final_attributes.assert_called_with( + { + "path": PATH, + "job_id": self.JOB_ID, + "location": None, + }, + client2, + None, + ) conn1.api_request.assert_not_called() conn2.api_request.assert_called_once_with( - method="GET", path=PATH, query_params={}, timeout=None + method="GET", + path=PATH, + query_params={"projection": "full"}, + timeout=DEFAULT_GET_JOB_TIMEOUT, ) self._verifyResourceProperties(job, RESOURCE) def test_reload_w_job_reference(self): from google.cloud.bigquery import job + from google.cloud.bigquery.retry import DEFAULT_GET_JOB_TIMEOUT resource = self._make_resource(ended=True) resource["jobReference"]["projectId"] = "alternative-project" @@ -768,16 +795,20 @@ def test_reload_w_job_reference(self): load_job.reload() final_attributes.assert_called_with( - {"path": "/projects/alternative-project/jobs/{}".format(self.JOB_ID)}, + { + "path": "/projects/alternative-project/jobs/{}".format(self.JOB_ID), + "job_id": self.JOB_ID, + "location": "US", + }, client, - load_job, + None, ) conn.api_request.assert_called_once_with( method="GET", path="/projects/alternative-project/jobs/{}".format(self.JOB_ID), - query_params={"location": "US"}, - timeout=None, + query_params={"projection": "full", "location": "US"}, + timeout=DEFAULT_GET_JOB_TIMEOUT, ) def test_cancel_w_bound_client(self): diff --git a/tests/unit/job/test_query.py b/tests/unit/job/test_query.py index 0fee053e3..c7b2c5f9c 100644 --- a/tests/unit/job/test_query.py +++ b/tests/unit/job/test_query.py @@ -28,6 +28,7 @@ from google.cloud.bigquery.client import _LIST_ROWS_FROM_QUERY_RESULTS_FIELDS import google.cloud.bigquery._job_helpers import google.cloud.bigquery.query +from google.cloud.bigquery.retry import DEFAULT_GET_JOB_TIMEOUT from google.cloud.bigquery.table import _EmptyRowIterator from ..helpers import make_connection @@ -959,8 +960,8 @@ def test_result_reloads_job_state_until_done(self): reload_call = mock.call( method="GET", path=f"/projects/{self.PROJECT}/jobs/{self.JOB_ID}", - query_params={"location": "EU"}, - timeout=None, + query_params={"projection": "full", "location": "EU"}, + timeout=DEFAULT_GET_JOB_TIMEOUT, ) query_page_call = mock.call( method="GET", @@ -1104,7 +1105,37 @@ def test_result_with_done_jobs_query_response_doesnt_call_get_query_results(self conn.api_request.assert_called_once_with( method="GET", path=job_path, - query_params={}, + query_params={"projection": "full"}, + timeout=DEFAULT_GET_JOB_TIMEOUT, + ) + + def test_result_with_none_timeout(self): + # Verifies that with an intentional None timeout, get job uses None + # instead of the default timeout. + job_resource = self._make_resource(started=True, ended=True, location="EU") + conn = make_connection(job_resource) + client = _make_client(self.PROJECT, connection=conn) + query_resource_done = { + "jobComplete": True, + "jobReference": {"projectId": self.PROJECT, "jobId": self.JOB_ID}, + "schema": {"fields": [{"name": "col1", "type": "STRING"}]}, + "rows": [{"f": [{"v": "abc"}]}], + "totalRows": "1", + } + job = google.cloud.bigquery._job_helpers._to_query_job( + client, + "SELECT 'abc' AS col1", + request_config=None, + query_response=query_resource_done, + ) + + job.result(timeout=None) + + job_path = f"/projects/{self.PROJECT}/jobs/{self.JOB_ID}" + conn.api_request.assert_called_once_with( + method="GET", + path=job_path, + query_params={"projection": "full"}, timeout=None, ) @@ -1287,8 +1318,8 @@ def test_result_w_custom_retry(self): reload_call = mock.call( method="GET", path=f"/projects/{self.PROJECT}/jobs/{self.JOB_ID}", - query_params={"location": "asia-northeast1"}, - timeout=None, + query_params={"projection": "full", "location": "asia-northeast1"}, + timeout=DEFAULT_GET_JOB_TIMEOUT, ) connection.api_request.assert_has_calls( @@ -1367,7 +1398,7 @@ def test_result_w_timeout_doesnt_raise(self): reload_call = mock.call( method="GET", path=f"/projects/{self.PROJECT}/jobs/{self.JOB_ID}", - query_params={"location": "US"}, + query_params={"projection": "full", "location": "US"}, timeout=1.125, ) get_query_results_call = mock.call( @@ -1412,7 +1443,7 @@ def test_result_w_timeout_raises_concurrent_futures_timeout(self): reload_call = mock.call( method="GET", path=f"/projects/{self.PROJECT}/jobs/{self.JOB_ID}", - query_params={"location": "US"}, + query_params={"projection": "full", "location": "US"}, timeout=1.125, ) get_query_results_call = mock.call( @@ -2160,12 +2191,23 @@ def test_reload_w_bound_client(self): ) as final_attributes: job.reload() - final_attributes.assert_called_with({"path": PATH}, client, job) + final_attributes.assert_called_with( + { + "path": PATH, + "job_id": self.JOB_ID, + "location": None, + }, + client, + None, + ) self.assertNotEqual(job.destination, table_ref) conn.api_request.assert_called_once_with( - method="GET", path=PATH, query_params={}, timeout=None + method="GET", + path=PATH, + query_params={"projection": "full"}, + timeout=DEFAULT_GET_JOB_TIMEOUT, ) self._verifyResourceProperties(job, RESOURCE) @@ -2190,11 +2232,22 @@ def test_reload_w_alternate_client(self): ) as final_attributes: job.reload(client=client2) - final_attributes.assert_called_with({"path": PATH}, client2, job) + final_attributes.assert_called_with( + { + "path": PATH, + "job_id": self.JOB_ID, + "location": None, + }, + client2, + None, + ) conn1.api_request.assert_not_called() conn2.api_request.assert_called_once_with( - method="GET", path=PATH, query_params={}, timeout=None + method="GET", + path=PATH, + query_params={"projection": "full"}, + timeout=DEFAULT_GET_JOB_TIMEOUT, ) self._verifyResourceProperties(job, RESOURCE) @@ -2217,13 +2270,23 @@ def test_reload_w_timeout(self): "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" ) as final_attributes: job.reload(timeout=4.2) - - final_attributes.assert_called_with({"path": PATH}, client, job) + final_attributes.assert_called_with( + { + "path": PATH, + "job_id": self.JOB_ID, + "location": None, + }, + client, + None, + ) self.assertNotEqual(job.destination, table_ref) conn.api_request.assert_called_once_with( - method="GET", path=PATH, query_params={}, timeout=4.2 + method="GET", + path=PATH, + query_params={"projection": "full"}, + timeout=4.2, ) def test_iter(self): diff --git a/tests/unit/test__job_helpers.py b/tests/unit/test__job_helpers.py index 9f661dca7..96914d9f9 100644 --- a/tests/unit/test__job_helpers.py +++ b/tests/unit/test__job_helpers.py @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import functools from typing import Any, Dict, Optional from unittest import mock @@ -21,15 +20,18 @@ from google.api_core import retry as retries import pytest -from google.cloud.bigquery.client import Client -from google.cloud.bigquery import enums from google.cloud.bigquery import _job_helpers +from google.cloud.bigquery import enums +from google.cloud.bigquery import retry +from google.cloud.bigquery.client import Client from google.cloud.bigquery.job import copy_ as job_copy from google.cloud.bigquery.job import extract as job_extract from google.cloud.bigquery.job import load as job_load from google.cloud.bigquery.job import query as job_query from google.cloud.bigquery.query import ConnectionProperty, ScalarQueryParameter +from .helpers import make_client, make_connection + def make_query_request(additional_properties: Optional[Dict[str, Any]] = None): request = {"useLegacySql": False, "formatOptions": {"useInt64Timestamp": True}} @@ -806,11 +808,8 @@ def test_query_and_wait_caches_completed_query_results_one_page_no_rows(): def test_query_and_wait_caches_completed_query_results_more_pages(): - client = mock.create_autospec(Client) - client._list_rows_from_query_results = functools.partial( - Client._list_rows_from_query_results, client - ) - client._call_api.side_effect = ( + client = make_client() + conn = client._connection = make_connection( { "jobReference": { "projectId": "response-project", @@ -882,10 +881,7 @@ def test_query_and_wait_caches_completed_query_results_more_pages(): # Start the query. jobs_query_path = "/projects/request-project/queries" - client._call_api.assert_any_call( - None, # retry - span_name="BigQuery.query", - span_attributes={"path": jobs_query_path}, + conn.api_request.assert_any_call( method="POST", path=jobs_query_path, data={ @@ -906,8 +902,7 @@ def test_query_and_wait_caches_completed_query_results_more_pages(): # Fetch the remaining two pages. jobs_get_query_results_path = "/projects/response-project/queries/response-job-id" - client._call_api.assert_any_call( - None, # retry + conn.api_request.assert_any_call( timeout=None, method="GET", path=jobs_get_query_results_path, @@ -918,8 +913,7 @@ def test_query_and_wait_caches_completed_query_results_more_pages(): "formatOptions.useInt64Timestamp": True, }, ) - client._call_api.assert_any_call( - None, # retry + conn.api_request.assert_any_call( timeout=None, method="GET", path=jobs_get_query_results_path, @@ -933,12 +927,8 @@ def test_query_and_wait_caches_completed_query_results_more_pages(): def test_query_and_wait_incomplete_query(): - client = mock.create_autospec(Client) - client._get_query_results = functools.partial(Client._get_query_results, client) - client._list_rows_from_query_results = functools.partial( - Client._list_rows_from_query_results, client - ) - client._call_api.side_effect = ( + client = make_client() + conn = client._connection = make_connection( # jobs.query { "jobReference": { @@ -1022,10 +1012,7 @@ def test_query_and_wait_incomplete_query(): # Start the query. jobs_query_path = "/projects/request-project/queries" - client._call_api.assert_any_call( - None, # retry - span_name="BigQuery.query", - span_attributes={"path": jobs_query_path}, + conn.api_request.assert_any_call( method="POST", path=jobs_query_path, data={ @@ -1041,10 +1028,7 @@ def test_query_and_wait_incomplete_query(): # Wait for the query to finish. jobs_get_query_results_path = "/projects/response-project/queries/response-job-id" - client._call_api.assert_any_call( - None, # retry - span_name="BigQuery.getQueryResults", - span_attributes={"path": jobs_get_query_results_path}, + conn.api_request.assert_any_call( method="GET", path=jobs_get_query_results_path, query_params={ @@ -1063,20 +1047,15 @@ def test_query_and_wait_incomplete_query(): # Fetch the job metadata in case the RowIterator needs the destination table. jobs_get_path = "/projects/response-project/jobs/response-job-id" - client._call_api.assert_any_call( - None, # retry - span_name="BigQuery.job.reload", - span_attributes={"path": jobs_get_path}, - job_ref=mock.ANY, + conn.api_request.assert_any_call( method="GET", path=jobs_get_path, - query_params={"location": "response-location"}, - timeout=None, + query_params={"projection": "full", "location": "response-location"}, + timeout=retry.DEFAULT_GET_JOB_TIMEOUT, ) # Fetch the remaining two pages. - client._call_api.assert_any_call( - None, # retry + conn.api_request.assert_any_call( timeout=None, method="GET", path=jobs_get_query_results_path, @@ -1086,8 +1065,7 @@ def test_query_and_wait_incomplete_query(): "formatOptions.useInt64Timestamp": True, }, ) - client._call_api.assert_any_call( - None, # retry + conn.api_request.assert_any_call( timeout=None, method="GET", path=jobs_get_query_results_path, diff --git a/tests/unit/test_client.py b/tests/unit/test_client.py index a5434019b..ed5575f6c 100644 --- a/tests/unit/test_client.py +++ b/tests/unit/test_client.py @@ -3167,6 +3167,7 @@ def test_job_from_resource_unknown_type(self): def test_get_job_miss_w_explict_project(self): from google.cloud.exceptions import NotFound + from google.cloud.bigquery.retry import DEFAULT_GET_JOB_TIMEOUT OTHER_PROJECT = "OTHER_PROJECT" JOB_ID = "NONESUCH" @@ -3181,11 +3182,12 @@ def test_get_job_miss_w_explict_project(self): method="GET", path="/projects/OTHER_PROJECT/jobs/NONESUCH", query_params={"projection": "full"}, - timeout=DEFAULT_TIMEOUT, + timeout=DEFAULT_GET_JOB_TIMEOUT, ) def test_get_job_miss_w_client_location(self): from google.cloud.exceptions import NotFound + from google.cloud.bigquery.retry import DEFAULT_GET_JOB_TIMEOUT JOB_ID = "NONESUCH" creds = _make_credentials() @@ -3199,7 +3201,7 @@ def test_get_job_miss_w_client_location(self): method="GET", path="/projects/client-proj/jobs/NONESUCH", query_params={"projection": "full", "location": "client-loc"}, - timeout=DEFAULT_TIMEOUT, + timeout=DEFAULT_GET_JOB_TIMEOUT, ) def test_get_job_hit_w_timeout(self): diff --git a/tests/unit/test_job_retry.py b/tests/unit/test_job_retry.py index 2dcc5878d..46eb1d6b3 100644 --- a/tests/unit/test_job_retry.py +++ b/tests/unit/test_job_retry.py @@ -23,85 +23,93 @@ import freezegun import requests.exceptions -from google.cloud.bigquery.client import Client from google.cloud.bigquery import _job_helpers import google.cloud.bigquery.retry -from .helpers import make_connection +from .helpers import make_client, make_connection -# With job_retry_on_query, we're testing 4 scenarios: +_RETRY_NOT_FOUND = { + "job_retry": google.api_core.retry.Retry( + predicate=google.api_core.retry.if_exception_type( + google.api_core.exceptions.NotFound, + ), + ), +} +_RETRY_BAD_REQUEST = { + "job_retry": google.api_core.retry.Retry( + predicate=google.api_core.retry.if_exception_type( + google.api_core.exceptions.BadRequest, + ), + ), +} + + +# Test retry of job failures, instead of API-invocation failures. 4 scenarios: # - No `job_retry` passed, retry on default rateLimitExceeded. # - Pass NotFound retry to `query`. # - Pass NotFound retry to `result`. # - Pass BadRequest retry to query, with the value passed to `result` overriding. -@pytest.mark.parametrize("job_retry_on_query", [None, "Query", "Result", "Both"]) @mock.patch("time.sleep") -def test_retry_failed_jobs(sleep, client, job_retry_on_query): - """ - Test retry of job failures, as opposed to API-invocation failures. - """ - - retry_notfound = google.api_core.retry.Retry( - predicate=google.api_core.retry.if_exception_type( - google.api_core.exceptions.NotFound - ) - ) - retry_badrequest = google.api_core.retry.Retry( - predicate=google.api_core.retry.if_exception_type( - google.api_core.exceptions.BadRequest - ) - ) - - if job_retry_on_query is None: - reason = "rateLimitExceeded" - else: - reason = "notFound" - +@pytest.mark.parametrize( + "reason, job_retry, result_retry", + [ + pytest.param( + "rateLimitExceeded", + {}, + {}, + id="no job_retry", + ), + pytest.param( + "notFound", + _RETRY_NOT_FOUND, + {}, + id="Query NotFound", + ), + pytest.param( + "notFound", + _RETRY_NOT_FOUND, + _RETRY_NOT_FOUND, + id="Result NotFound", + ), + pytest.param( + "notFound", + _RETRY_BAD_REQUEST, + _RETRY_NOT_FOUND, + id="BadRequest", + ), + ], +) +def test_retry_failed_jobs(sleep, reason, job_retry, result_retry): + client = make_client() err = dict(reason=reason) - responses = [ - dict(status=dict(state="DONE", errors=[err], errorResult=err)), - dict(status=dict(state="DONE", errors=[err], errorResult=err)), - dict(status=dict(state="DONE", errors=[err], errorResult=err)), - dict(status=dict(state="DONE")), + conn = client._connection = make_connection( + dict( + status=dict(state="DONE", errors=[err], errorResult=err), + jobReference={"jobId": "id_1"}, + ), + dict( + status=dict(state="DONE", errors=[err], errorResult=err), + jobReference={"jobId": "id_1"}, + ), + dict( + status=dict(state="DONE", errors=[err], errorResult=err), + jobReference={"jobId": "id_1"}, + ), + dict(status=dict(state="DONE"), jobReference={"jobId": "id_2"}), dict(rows=[{"f": [{"v": "1"}]}], totalRows="1"), - ] - - def api_request(method, path, query_params=None, data=None, **kw): - response = responses.pop(0) - if data: - response["jobReference"] = data["jobReference"] - else: - response["jobReference"] = dict( - jobId=path.split("/")[-1], projectId="PROJECT" - ) - return response - - conn = client._connection = make_connection() - conn.api_request.side_effect = api_request + ) - if job_retry_on_query == "Query": - job_retry = dict(job_retry=retry_notfound) - elif job_retry_on_query == "Both": - # This will be overridden in `result` - job_retry = dict(job_retry=retry_badrequest) - else: - job_retry = {} job = client.query("select 1", **job_retry) + result = job.result(**result_retry) - orig_job_id = job.job_id - job_retry = ( - dict(job_retry=retry_notfound) - if job_retry_on_query in ("Result", "Both") - else {} - ) - result = job.result(**job_retry) assert result.total_rows == 1 - assert not responses # We made all the calls we expected to. + + # We made all the calls we expected to. + assert conn.api_request.call_count == 5 # The job adjusts it's job id based on the id of the last attempt. - assert job.job_id != orig_job_id - assert job.job_id == conn.mock_calls[3][2]["data"]["jobReference"]["jobId"] + assert job.job_id == "id_2" # We had to sleep three times assert len(sleep.mock_calls) == 3 @@ -114,17 +122,19 @@ def api_request(method, path, query_params=None, data=None, **kw): assert max(c[1][0] for c in sleep.mock_calls) <= 8 # We can ask for the result again: - responses = [ + conn = client._connection = make_connection( dict(rows=[{"f": [{"v": "1"}]}], totalRows="1"), - ] - orig_job_id = job.job_id + ) result = job.result() + assert result.total_rows == 1 - assert not responses # We made all the calls we expected to. + + # We made all the calls we expected to. + assert conn.api_request.call_count == 1 # We wouldn't (and didn't) fail, because we're dealing with a successful job. # So the job id hasn't changed. - assert job.job_id == orig_job_id + assert job.job_id == "id_2" def test_retry_connection_error_with_default_retries_and_successful_first_job( @@ -209,8 +219,8 @@ def make_job_id(*args, **kwargs): mock.call( method="GET", path="/projects/PROJECT/jobs/1", - query_params={"location": "test-loc"}, - timeout=None, + query_params={"location": "test-loc", "projection": "full"}, + timeout=google.cloud.bigquery.retry.DEFAULT_GET_JOB_TIMEOUT, ), # jobs.getQueryResults x2 mock.call( @@ -229,8 +239,8 @@ def make_job_id(*args, **kwargs): mock.call( method="GET", path="/projects/PROJECT/jobs/1", - query_params={"location": "test-loc"}, - timeout=None, + query_params={"location": "test-loc", "projection": "full"}, + timeout=google.cloud.bigquery.retry.DEFAULT_GET_JOB_TIMEOUT, ), # jobs.getQueryResults mock.call( @@ -307,8 +317,7 @@ def make_job_id(*args, **kwargs): {"jobReference": job_reference_2, "status": {"state": "DONE"}}, ] - conn = client._connection = make_connection() - conn.api_request.side_effect = responses + conn = client._connection = make_connection(*responses) with freezegun.freeze_time( # Note: because of exponential backoff and a bit of jitter, @@ -341,8 +350,8 @@ def make_job_id(*args, **kwargs): mock.call( method="GET", path="/projects/PROJECT/jobs/1", - query_params={"location": "test-loc"}, - timeout=None, + query_params={"location": "test-loc", "projection": "full"}, + timeout=google.cloud.bigquery.retry.DEFAULT_GET_JOB_TIMEOUT, ), # jobs.getQueryResults x2 mock.call( @@ -361,8 +370,8 @@ def make_job_id(*args, **kwargs): mock.call( method="GET", path="/projects/PROJECT/jobs/1", - query_params={"location": "test-loc"}, - timeout=None, + query_params={"location": "test-loc", "projection": "full"}, + timeout=google.cloud.bigquery.retry.DEFAULT_GET_JOB_TIMEOUT, ), # jobs.insert mock.call( @@ -384,8 +393,8 @@ def make_job_id(*args, **kwargs): mock.call( method="GET", path="/projects/PROJECT/jobs/2", - query_params={"location": "test-loc"}, - timeout=None, + query_params={"location": "test-loc", "projection": "full"}, + timeout=google.cloud.bigquery.retry.DEFAULT_GET_JOB_TIMEOUT, ), # jobs.getQueryResults mock.call( @@ -398,8 +407,8 @@ def make_job_id(*args, **kwargs): mock.call( method="GET", path="/projects/PROJECT/jobs/2", - query_params={"location": "test-loc"}, - timeout=None, + query_params={"location": "test-loc", "projection": "full"}, + timeout=google.cloud.bigquery.retry.DEFAULT_GET_JOB_TIMEOUT, ), ] ) @@ -531,12 +540,9 @@ def test_query_and_wait_retries_job_for_DDL_queries(): https://github.com/googleapis/python-bigquery/issues/1790 """ freezegun.freeze_time(auto_tick_seconds=1) - client = mock.create_autospec(Client) - client._call_api.__name__ = "_call_api" - client._call_api.__qualname__ = "Client._call_api" - client._call_api.__annotations__ = {} - client._call_api.__type_params__ = () - client._call_api.side_effect = ( + + client = make_client() + conn = client._connection = make_connection( { "jobReference": { "projectId": "response-project", @@ -589,7 +595,7 @@ def test_query_and_wait_retries_job_for_DDL_queries(): # and https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/getQueryResults query_request_path = "/projects/request-project/queries" - calls = client._call_api.call_args_list + calls = conn.api_request.call_args_list _, kwargs = calls[0] assert kwargs["method"] == "POST" assert kwargs["path"] == query_request_path