Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add default timeout for Client.get_job() #1935

Merged
merged 24 commits into from
May 31, 2024
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions google/cloud/bigquery/_job_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,15 @@
import functools
import os
import uuid
from typing import Any, Dict, TYPE_CHECKING, Optional
from typing import Any, Dict, Optional, TYPE_CHECKING, Union

import google.api_core.exceptions as core_exceptions
from google.api_core import retry as retries

from google.cloud.bigquery import job
import google.cloud.bigquery.query
from google.cloud.bigquery import table
from google.cloud.bigquery.retry import POLLING_DEFAULT_VALUE

# Avoid circular imports
if TYPE_CHECKING: # pragma: NO COVER
Expand Down Expand Up @@ -328,7 +329,7 @@ def query_and_wait(
location: Optional[str],
project: str,
api_timeout: Optional[float] = None,
wait_timeout: Optional[float] = None,
wait_timeout: Optional[Union[float, object]] = POLLING_DEFAULT_VALUE,
retry: Optional[retries.Retry],
job_retry: Optional[retries.Retry],
page_size: Optional[int] = None,
Expand Down Expand Up @@ -364,10 +365,12 @@ def query_and_wait(
api_timeout (Optional[float]):
The number of seconds to wait for the underlying HTTP transport
before using ``retry``.
wait_timeout (Optional[float]):
wait_timeout (Optional[Union[float, object]]):
The number of seconds to wait for the query to finish. If the
query doesn't finish before this timeout, the client attempts
to cancel the query.
to cancel the query. If unset, the underlying Client.get_job() API
call has timeout, but we still wait indefinitely for the job to
finish.
retry (Optional[google.api_core.retry.Retry]):
How to retry the RPC. This only applies to making RPC
calls. It isn't used to retry failed jobs. This has
Expand Down Expand Up @@ -545,7 +548,7 @@ def _supported_by_jobs_query(request_body: Dict[str, Any]) -> bool:
def _wait_or_cancel(
job: job.QueryJob,
api_timeout: Optional[float],
wait_timeout: Optional[float],
wait_timeout: Optional[Union[object, float]],
retry: Optional[retries.Retry],
page_size: Optional[int],
max_results: Optional[int],
Expand Down
3 changes: 2 additions & 1 deletion google/cloud/bigquery/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@
DEFAULT_JOB_RETRY,
DEFAULT_RETRY,
DEFAULT_TIMEOUT,
DEFAULT_GET_JOB_TIMEOUT,
)
from google.cloud.bigquery.routine import Routine
from google.cloud.bigquery.routine import RoutineReference
Expand Down Expand Up @@ -2139,7 +2140,7 @@ def get_job(
project: Optional[str] = None,
location: Optional[str] = None,
retry: retries.Retry = DEFAULT_RETRY,
timeout: TimeoutType = DEFAULT_TIMEOUT,
timeout: TimeoutType = DEFAULT_GET_JOB_TIMEOUT,
) -> Union[job.LoadJob, job.CopyJob, job.ExtractJob, job.QueryJob, job.UnknownJob]:
"""Fetch a job for the project associated with this client.

Expand Down
53 changes: 33 additions & 20 deletions google/cloud/bigquery/job/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@
import http
import threading
import typing
from typing import ClassVar, Dict, Optional, Sequence
from typing import Any, ClassVar, Dict, Optional, Sequence, Union

from google.api_core import retry as retries
from google.api_core import exceptions
import google.api_core.future.polling

from google.cloud.bigquery import _helpers
from google.cloud.bigquery.retry import DEFAULT_RETRY
from google.cloud.bigquery.retry import DEFAULT_RETRY, POLLING_DEFAULT_VALUE
from google.cloud.bigquery._helpers import _int_or_none


Expand Down Expand Up @@ -801,7 +801,7 @@ def reload(
self,
client=None,
retry: "retries.Retry" = DEFAULT_RETRY,
timeout: Optional[float] = None,
timeout: Optional[Union[float, object]] = POLLING_DEFAULT_VALUE,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd much rather this defaulted to the same timeout as get_job() and anywhere that calls reload(), we make sure we do the check for POLLING_DEFAULT_VALUE before calling reload() to make sure we only pass in float or None or omit the value.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

):
"""API call: refresh job properties via a GET request.

Expand All @@ -814,28 +814,39 @@ def reload(
``client`` stored on the current dataset.

retry (Optional[google.api_core.retry.Retry]): How to retry the RPC.
timeout (Optional[float]):
timeout (Optinal[Union[float, \
google.api_core.future.polling.PollingFuture._DEFAULT_VALUE, \
]]):
The number of seconds to wait for the underlying HTTP transport
before using ``retry``.
"""
client = self._require_client(client)

extra_params = {}
if self.location:
extra_params["location"] = self.location
span_attributes = {"path": self.path}
kwargs: Dict[str, Any] = {}
if type(timeout) is object:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add some comments explaining why we need to omit the timeout if we get _DEFAULT_VALUE. (Because we want to use the default API-level timeouts but wait indefinitely for the job to finish)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added explanation why kwargs is used here, but I feel like Queryjob.result() is a better place to explain the default timeout behavior, so I added it in docstring there instead.

# When timeout is the sentinel value, we use the default API-level
# timeout at Client.get_job().
pass
elif timeout is None:
# If timeout is None, wait indefinitely even at API-level.
kwargs["timeout"] = None
elif isinstance(timeout, (int, float)):
kwargs["timeout"] = timeout
else:
raise ValueError(
f"Unsupported timeout type {type(timeout)}. "
"Must be float, int, None, or "
"google.api_core.future.polling.PollingFuture._DEFAULT_VALUE."
)

api_response = client._call_api(
retry,
span_name="BigQuery.job.reload",
span_attributes=span_attributes,
job_ref=self,
method="GET",
path=self.path,
query_params=extra_params,
timeout=timeout,
got_job = client.get_job(
self,
project=self.project,
location=self.location,
retry=retry,
**kwargs,
)
self._set_properties(api_response)
self._set_properties(got_job._properties)

def cancel(
self,
Expand Down Expand Up @@ -913,7 +924,7 @@ def _set_future_result(self):
def done(
self,
retry: "retries.Retry" = DEFAULT_RETRY,
timeout: Optional[float] = None,
timeout: Optional[Union[float, object]] = POLLING_DEFAULT_VALUE,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here. I don't think this needs to handle POLLING_DEFAULT_VALUE since it's not waiting for the job to finish, just making at most 1 API call.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

reload: bool = True,
) -> bool:
"""Checks if the job is complete.
Expand All @@ -922,7 +933,9 @@ def done(
retry (Optional[google.api_core.retry.Retry]):
How to retry the RPC. If the job state is ``DONE``, retrying is aborted
early, as the job will not change anymore.
timeout (Optional[float]):
timeout (Optinal[Union[float, \
google.api_core.future.polling.PollingFuture._DEFAULT_VALUE, \
]]):
The number of seconds to wait for the underlying HTTP transport
before using ``retry``.
reload (Optional[bool]):
Expand Down
30 changes: 23 additions & 7 deletions google/cloud/bigquery/job/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,11 @@
StructQueryParameter,
UDFResource,
)
from google.cloud.bigquery.retry import DEFAULT_RETRY, DEFAULT_JOB_RETRY
from google.cloud.bigquery.retry import (
DEFAULT_RETRY,
DEFAULT_JOB_RETRY,
POLLING_DEFAULT_VALUE,
)
from google.cloud.bigquery.routine import RoutineReference
from google.cloud.bigquery.schema import SchemaField
from google.cloud.bigquery.table import _EmptyRowIterator
Expand Down Expand Up @@ -1437,7 +1441,7 @@ def result( # type: ignore # (incompatible with supertype)
page_size: Optional[int] = None,
max_results: Optional[int] = None,
retry: Optional[retries.Retry] = DEFAULT_RETRY,
timeout: Optional[float] = None,
timeout: Optional[Union[float, object]] = POLLING_DEFAULT_VALUE,
start_index: Optional[int] = None,
job_retry: Optional[retries.Retry] = DEFAULT_JOB_RETRY,
) -> Union["RowIterator", _EmptyRowIterator]:
Expand All @@ -1457,11 +1461,14 @@ def result( # type: ignore # (incompatible with supertype)
is ``DONE``, retrying is aborted early even if the
results are not available, as this will not change
anymore.
timeout (Optional[float]):
timeout (Optinal[Union[float, \
google.api_core.future.polling.PollingFuture._DEFAULT_VALUE, \
]]):
The number of seconds to wait for the underlying HTTP transport
before using ``retry``.
If multiple requests are made under the hood, ``timeout``
applies to each individual request.
before using ``retry``. If ``None``, wait indefinitely
unless a retriable error is returned. If unset, only the
Linchin marked this conversation as resolved.
Show resolved Hide resolved
underlying Client.get_job() API call has timeout, but we still
Linchin marked this conversation as resolved.
Show resolved Hide resolved
wait indefinitely for the job to finish.
start_index (Optional[int]):
The zero-based index of the starting row to read.
job_retry (Optional[google.api_core.retry.Retry]):
Expand Down Expand Up @@ -1507,6 +1514,15 @@ def result( # type: ignore # (incompatible with supertype)
# Intentionally omit job_id and query_id since this doesn't
# actually correspond to a finished query job.
)

# When timeout has default sentinel value, only pass the sentinel
# timeout to QueryJob.done(), and use None for the other timeouts.
if type(timeout) is object:
timeout = None
get_job_timeout = POLLING_DEFAULT_VALUE
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not set to DEFAULT_GET_JOB_TIMEOUT or do the kwargs trick to omit it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

else:
get_job_timeout = timeout

try:
retry_do_query = getattr(self, "_retry_do_query", None)
if retry_do_query is not None:
Expand Down Expand Up @@ -1548,7 +1564,7 @@ def is_job_done():
# rateLimitExceeded errors are ambiguous. We want to know if
# the query job failed and not just the call to
# jobs.getQueryResults.
if self.done(retry=retry, timeout=timeout):
if self.done(retry=retry, timeout=get_job_timeout):
# If it's already failed, we might as well stop.
job_failed_exception = self.exception()
if job_failed_exception is not None:
Expand Down
11 changes: 11 additions & 0 deletions google/cloud/bigquery/retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

from google.api_core import exceptions
from google.api_core import retry
import google.api_core.future.polling
from google.auth import exceptions as auth_exceptions # type: ignore
import requests.exceptions

Expand Down Expand Up @@ -140,3 +141,13 @@ def _job_should_retry(exc):
"""
The default job retry object.
"""

DEFAULT_GET_JOB_TIMEOUT = 128
"""
Default timeout for Client.get_job().
"""

POLLING_DEFAULT_VALUE = google.api_core.future.polling.PollingFuture._DEFAULT_VALUE
"""
Default value defined in google.api_core.future.polling.PollingFuture.
"""
Loading