Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add default timeout for Client.get_job() #1935

Merged
merged 24 commits into from
May 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions google/cloud/bigquery/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
from google.auth import credentials as ga_credentials # type: ignore
from google.api_core import client_options as client_options_lib

TimeoutType = Union[float, None]

_RFC3339_MICROS_NO_ZULU = "%Y-%m-%dT%H:%M:%S.%f"
_TIMEONLY_WO_MICROS = "%H:%M:%S"
_TIMEONLY_W_MICROS = "%H:%M:%S.%f"
Expand Down
13 changes: 8 additions & 5 deletions google/cloud/bigquery/_job_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,15 @@
import functools
import os
import uuid
from typing import Any, Dict, TYPE_CHECKING, Optional
from typing import Any, Dict, Optional, TYPE_CHECKING, Union

import google.api_core.exceptions as core_exceptions
from google.api_core import retry as retries

from google.cloud.bigquery import job
import google.cloud.bigquery.query
from google.cloud.bigquery import table
from google.cloud.bigquery.retry import POLLING_DEFAULT_VALUE

# Avoid circular imports
if TYPE_CHECKING: # pragma: NO COVER
Expand Down Expand Up @@ -328,7 +329,7 @@ def query_and_wait(
location: Optional[str],
project: str,
api_timeout: Optional[float] = None,
wait_timeout: Optional[float] = None,
wait_timeout: Optional[Union[float, object]] = POLLING_DEFAULT_VALUE,
retry: Optional[retries.Retry],
job_retry: Optional[retries.Retry],
page_size: Optional[int] = None,
Expand Down Expand Up @@ -364,10 +365,12 @@ def query_and_wait(
api_timeout (Optional[float]):
The number of seconds to wait for the underlying HTTP transport
before using ``retry``.
wait_timeout (Optional[float]):
wait_timeout (Optional[Union[float, object]]):
The number of seconds to wait for the query to finish. If the
query doesn't finish before this timeout, the client attempts
to cancel the query.
to cancel the query. If unset, the underlying Client.get_job() API
call has timeout, but we still wait indefinitely for the job to
finish.
retry (Optional[google.api_core.retry.Retry]):
How to retry the RPC. This only applies to making RPC
calls. It isn't used to retry failed jobs. This has
Expand Down Expand Up @@ -545,7 +548,7 @@ def _supported_by_jobs_query(request_body: Dict[str, Any]) -> bool:
def _wait_or_cancel(
job: job.QueryJob,
api_timeout: Optional[float],
wait_timeout: Optional[float],
wait_timeout: Optional[Union[object, float]],
retry: Optional[retries.Retry],
page_size: Optional[int],
max_results: Optional[int],
Expand Down
5 changes: 3 additions & 2 deletions google/cloud/bigquery/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
from google.cloud.bigquery._helpers import _DEFAULT_UNIVERSE
from google.cloud.bigquery._helpers import _validate_universe
from google.cloud.bigquery._helpers import _get_client_universe
from google.cloud.bigquery._helpers import TimeoutType
from google.cloud.bigquery._job_helpers import make_job_id as _make_job_id
from google.cloud.bigquery.dataset import Dataset
from google.cloud.bigquery.dataset import DatasetListItem
Expand All @@ -107,6 +108,7 @@
DEFAULT_JOB_RETRY,
DEFAULT_RETRY,
DEFAULT_TIMEOUT,
DEFAULT_GET_JOB_TIMEOUT,
)
from google.cloud.bigquery.routine import Routine
from google.cloud.bigquery.routine import RoutineReference
Expand All @@ -123,7 +125,6 @@
_versions_helpers.PANDAS_VERSIONS.try_import()
) # mypy check fails because pandas import is outside module, there are type: ignore comments related to this

TimeoutType = Union[float, None]
ResumableTimeoutType = Union[
None, float, Tuple[float, float]
] # for resumable media methods
Expand Down Expand Up @@ -2139,7 +2140,7 @@ def get_job(
project: Optional[str] = None,
location: Optional[str] = None,
retry: retries.Retry = DEFAULT_RETRY,
timeout: TimeoutType = DEFAULT_TIMEOUT,
timeout: TimeoutType = DEFAULT_GET_JOB_TIMEOUT,
) -> Union[job.LoadJob, job.CopyJob, job.ExtractJob, job.QueryJob, job.UnknownJob]:
"""Fetch a job for the project associated with this client.

Expand Down
29 changes: 12 additions & 17 deletions google/cloud/bigquery/job/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,11 @@
import google.api_core.future.polling

from google.cloud.bigquery import _helpers
from google.cloud.bigquery.retry import DEFAULT_RETRY
from google.cloud.bigquery._helpers import _int_or_none
from google.cloud.bigquery.retry import (
DEFAULT_GET_JOB_TIMEOUT,
DEFAULT_RETRY,
)


_DONE_STATE = "DONE"
Expand Down Expand Up @@ -801,7 +804,7 @@ def reload(
self,
client=None,
retry: "retries.Retry" = DEFAULT_RETRY,
timeout: Optional[float] = None,
timeout: Optional[float] = DEFAULT_GET_JOB_TIMEOUT,
):
"""API call: refresh job properties via a GET request.

Expand All @@ -820,22 +823,14 @@ def reload(
"""
client = self._require_client(client)

extra_params = {}
if self.location:
extra_params["location"] = self.location
span_attributes = {"path": self.path}

api_response = client._call_api(
retry,
span_name="BigQuery.job.reload",
span_attributes=span_attributes,
job_ref=self,
method="GET",
path=self.path,
query_params=extra_params,
got_job = client.get_job(
self,
project=self.project,
location=self.location,
retry=retry,
timeout=timeout,
)
self._set_properties(api_response)
self._set_properties(got_job._properties)

def cancel(
self,
Expand Down Expand Up @@ -913,7 +908,7 @@ def _set_future_result(self):
def done(
self,
retry: "retries.Retry" = DEFAULT_RETRY,
timeout: Optional[float] = None,
timeout: Optional[float] = DEFAULT_GET_JOB_TIMEOUT,
reload: bool = True,
) -> bool:
"""Checks if the job is complete.
Expand Down
34 changes: 24 additions & 10 deletions google/cloud/bigquery/job/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,11 @@
StructQueryParameter,
UDFResource,
)
from google.cloud.bigquery.retry import DEFAULT_RETRY, DEFAULT_JOB_RETRY
from google.cloud.bigquery.retry import (
DEFAULT_RETRY,
DEFAULT_JOB_RETRY,
POLLING_DEFAULT_VALUE,
)
from google.cloud.bigquery.routine import RoutineReference
from google.cloud.bigquery.schema import SchemaField
from google.cloud.bigquery.table import _EmptyRowIterator
Expand Down Expand Up @@ -1437,7 +1441,7 @@ def result( # type: ignore # (incompatible with supertype)
page_size: Optional[int] = None,
max_results: Optional[int] = None,
retry: Optional[retries.Retry] = DEFAULT_RETRY,
timeout: Optional[float] = None,
timeout: Optional[Union[float, object]] = POLLING_DEFAULT_VALUE,
start_index: Optional[int] = None,
job_retry: Optional[retries.Retry] = DEFAULT_JOB_RETRY,
) -> Union["RowIterator", _EmptyRowIterator]:
Expand All @@ -1457,11 +1461,14 @@ def result( # type: ignore # (incompatible with supertype)
is ``DONE``, retrying is aborted early even if the
results are not available, as this will not change
anymore.
timeout (Optional[float]):
timeout (Optional[Union[float, \
google.api_core.future.polling.PollingFuture._DEFAULT_VALUE, \
]]):
The number of seconds to wait for the underlying HTTP transport
before using ``retry``.
If multiple requests are made under the hood, ``timeout``
applies to each individual request.
before using ``retry``. If ``None``, wait indefinitely
unless an error is returned. If unset, only the
underlying API calls have their default timeouts, but we still
wait indefinitely for the job to finish.
start_index (Optional[int]):
The zero-based index of the starting row to read.
job_retry (Optional[google.api_core.retry.Retry]):
Expand Down Expand Up @@ -1507,6 +1514,13 @@ def result( # type: ignore # (incompatible with supertype)
# Intentionally omit job_id and query_id since this doesn't
# actually correspond to a finished query job.
)

# When timeout has default sentinel value ``object()``, do not pass
# anything to invoke default timeouts in subsequent calls.
kwargs: Dict[str, Union[_helpers.TimeoutType, object]] = {}
if type(timeout) is not object:
kwargs["timeout"] = timeout

try:
retry_do_query = getattr(self, "_retry_do_query", None)
if retry_do_query is not None:
Expand Down Expand Up @@ -1548,7 +1562,7 @@ def is_job_done():
# rateLimitExceeded errors are ambiguous. We want to know if
# the query job failed and not just the call to
# jobs.getQueryResults.
if self.done(retry=retry, timeout=timeout):
if self.done(retry=retry, **kwargs):
# If it's already failed, we might as well stop.
job_failed_exception = self.exception()
if job_failed_exception is not None:
Expand Down Expand Up @@ -1585,14 +1599,14 @@ def is_job_done():
# response from the REST API. This ensures we aren't
# making any extra API calls if the previous loop
# iteration fetched the finished job.
self._reload_query_results(retry=retry, timeout=timeout)
self._reload_query_results(retry=retry, **kwargs)
return True

# Call jobs.getQueryResults with max results set to 0 just to
# wait for the query to finish. Unlike most methods,
# jobs.getQueryResults hangs as long as it can to ensure we
# know when the query has finished as soon as possible.
self._reload_query_results(retry=retry, timeout=timeout)
self._reload_query_results(retry=retry, **kwargs)

# Even if the query is finished now according to
# jobs.getQueryResults, we'll want to reload the job status if
Expand Down Expand Up @@ -1682,10 +1696,10 @@ def is_job_done():
max_results=max_results,
start_index=start_index,
retry=retry,
timeout=timeout,
query_id=self.query_id,
first_page_response=first_page_response,
num_dml_affected_rows=self._query_results.num_dml_affected_rows,
**kwargs,
)
rows._preserve_order = _contains_order_by(self.query)
return rows
Expand Down
11 changes: 11 additions & 0 deletions google/cloud/bigquery/retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

from google.api_core import exceptions
from google.api_core import retry
import google.api_core.future.polling
from google.auth import exceptions as auth_exceptions # type: ignore
import requests.exceptions

Expand Down Expand Up @@ -140,3 +141,13 @@ def _job_should_retry(exc):
"""
The default job retry object.
"""

DEFAULT_GET_JOB_TIMEOUT = 128
"""
Default timeout for Client.get_job().
"""

POLLING_DEFAULT_VALUE = google.api_core.future.polling.PollingFuture._DEFAULT_VALUE
"""
Default value defined in google.api_core.future.polling.PollingFuture.
"""
Loading