Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add default timeout for Client.get_job() #1935

Merged
merged 24 commits into from
May 31, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions google/cloud/bigquery/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@
DEFAULT_JOB_RETRY,
DEFAULT_RETRY,
DEFAULT_TIMEOUT,
DEFAULT_GET_JOB_TIMEOUT,
)
from google.cloud.bigquery.routine import Routine
from google.cloud.bigquery.routine import RoutineReference
Expand Down Expand Up @@ -2139,7 +2140,7 @@ def get_job(
project: Optional[str] = None,
location: Optional[str] = None,
retry: retries.Retry = DEFAULT_RETRY,
timeout: TimeoutType = DEFAULT_TIMEOUT,
timeout: Optional[Union[TimeoutType, object]] = DEFAULT_GET_JOB_TIMEOUT,
Linchin marked this conversation as resolved.
Show resolved Hide resolved
) -> Union[job.LoadJob, job.CopyJob, job.ExtractJob, job.QueryJob, job.UnknownJob]:
"""Fetch a job for the project associated with this client.

Expand All @@ -2162,7 +2163,9 @@ def get_job(
object.
retry (Optional[google.api_core.retry.Retry]):
How to retry the RPC.
timeout (Optional[float]):
timeout (Optinal[Union[float, \
google.api_core.future.polling.PollingFuture._DEFAULT_VALUE, \
Linchin marked this conversation as resolved.
Show resolved Hide resolved
]]):
The number of seconds to wait for the underlying HTTP transport
before using ``retry``.

Expand Down
49 changes: 30 additions & 19 deletions google/cloud/bigquery/job/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@
import http
import threading
import typing
from typing import ClassVar, Dict, Optional, Sequence
from typing import Any, ClassVar, Dict, Optional, Sequence, Union

from google.api_core import retry as retries
from google.api_core import exceptions
import google.api_core.future.polling
from google.api_core.future.polling import PollingFuture
Linchin marked this conversation as resolved.
Show resolved Hide resolved

from google.cloud.bigquery import _helpers
from google.cloud.bigquery.retry import DEFAULT_RETRY
Expand Down Expand Up @@ -801,7 +802,7 @@ def reload(
self,
client=None,
retry: "retries.Retry" = DEFAULT_RETRY,
timeout: Optional[float] = None,
timeout: Optional[Union[float, object]] = PollingFuture._DEFAULT_VALUE,
):
"""API call: refresh job properties via a GET request.

Expand All @@ -814,28 +815,36 @@ def reload(
``client`` stored on the current dataset.

retry (Optional[google.api_core.retry.Retry]): How to retry the RPC.
timeout (Optional[float]):
timeout (Optinal[Union[float, \
google.api_core.future.polling.PollingFuture._DEFAULT_VALUE, \
]]):
The number of seconds to wait for the underlying HTTP transport
before using ``retry``.
"""
client = self._require_client(client)

extra_params = {}
if self.location:
extra_params["location"] = self.location
span_attributes = {"path": self.path}
kwargs: Dict[str, Any] = {}
if type(timeout) is object:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add some comments explaining why we need to omit the timeout if we get _DEFAULT_VALUE. (Because we want to use the default API-level timeouts but wait indefinitely for the job to finish)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added explanation why kwargs is used here, but I feel like Queryjob.result() is a better place to explain the default timeout behavior, so I added it in docstring there instead.

pass
elif timeout is None:
kwargs["timeout"] = None
elif isinstance(timeout, (int, float)):
kwargs["timeout"] = timeout
else:
raise ValueError(
f"Unsupported timeout type {type(timeout)}. "
"Must be float, int, None, or "
"google.api_core.future.polling.PollingFuture._DEFAULT_VALUE."
)

api_response = client._call_api(
retry,
span_name="BigQuery.job.reload",
span_attributes=span_attributes,
job_ref=self,
method="GET",
path=self.path,
query_params=extra_params,
timeout=timeout,
got_job = client.get_job(
self,
project=self.project,
location=self.location,
retry=retry,
**kwargs,
)
self._set_properties(api_response)
self._set_properties(got_job._properties)

def cancel(
self,
Expand Down Expand Up @@ -913,7 +922,7 @@ def _set_future_result(self):
def done(
self,
retry: "retries.Retry" = DEFAULT_RETRY,
timeout: Optional[float] = None,
timeout: Optional[Union[float, object]] = PollingFuture._DEFAULT_VALUE,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done isn't waiting for the job to complete, so I wonder if we actually need PollingFuture._DEFAULT_VALUE here?

It seems more analogous to Client.get_job so maybe we can use the same DEFAULT_GET_JOB_TIMEOUT, here? 🤔 We need to double-check that we aren't calling any other API requests like getQueryResults, if so.

Copy link
Contributor Author

@Linchin Linchin May 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done() calls reload(), which in turn calls Client.get_job(). So I added the sentinel values along the path result() -> done() -> reload(). Indeed I realize we don't have to add sentinel for done() or reload(), because it will be passed from result() anyway. As to getQueryResults, I don't think it's called from done() onward (it's used in result()). I think either way (using PollingFuture._DEFAULT_VALUE or DEFAULT_GET_JOB_TIMEOUT) it's effectively the same, so I think either way it's fine. Are there any other factors I'm not aware of?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to keep our type annotations as narrow as we can. My worry with PollingFuture._DEFAULT_VALUE is it opens it up to object, which technically is anything (string, bytes, anything) in Python.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, I ended up only using sentinel for result(), and used DEFAULT_GET_JOB_TIMEOUT for subsequent calls.

reload: bool = True,
) -> bool:
"""Checks if the job is complete.
Expand All @@ -922,7 +931,9 @@ def done(
retry (Optional[google.api_core.retry.Retry]):
How to retry the RPC. If the job state is ``DONE``, retrying is aborted
early, as the job will not change anymore.
timeout (Optional[float]):
timeout (Optinal[Union[float, \
google.api_core.future.polling.PollingFuture._DEFAULT_VALUE, \
]]):
The number of seconds to wait for the underlying HTTP transport
before using ``retry``.
reload (Optional[bool]):
Expand Down
22 changes: 18 additions & 4 deletions google/cloud/bigquery/job/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

from google.api_core import exceptions
from google.api_core import retry as retries
from google.api_core.future.polling import PollingFuture
import requests

from google.cloud.bigquery.dataset import Dataset
Expand Down Expand Up @@ -1437,7 +1438,7 @@ def result( # type: ignore # (incompatible with supertype)
page_size: Optional[int] = None,
max_results: Optional[int] = None,
retry: Optional[retries.Retry] = DEFAULT_RETRY,
timeout: Optional[float] = None,
timeout: Optional[Union[float, object]] = PollingFuture._DEFAULT_VALUE,
start_index: Optional[int] = None,
job_retry: Optional[retries.Retry] = DEFAULT_JOB_RETRY,
) -> Union["RowIterator", _EmptyRowIterator]:
Expand All @@ -1457,9 +1458,13 @@ def result( # type: ignore # (incompatible with supertype)
is ``DONE``, retrying is aborted early even if the
results are not available, as this will not change
anymore.
timeout (Optional[float]):
timeout (Optinal[Union[float, \
google.api_core.future.polling.PollingFuture._DEFAULT_VALUE, \
]]):
The number of seconds to wait for the underlying HTTP transport
before using ``retry``.
before using ``retry``. If ``None``, wait indefinitely
unless a retriable error is returned. If unset, only the
Linchin marked this conversation as resolved.
Show resolved Hide resolved
underlying Client.get_job() has timeout.
If multiple requests are made under the hood, ``timeout``
applies to each individual request.
start_index (Optional[int]):
Expand Down Expand Up @@ -1507,6 +1512,15 @@ def result( # type: ignore # (incompatible with supertype)
# Intentionally omit job_id and query_id since this doesn't
# actually correspond to a finished query job.
)

# When timeout has default sentinel value, only pass the sentinel
# timeout to QueryJob.done(), and use None for the other timeouts.
if type(timeout) is object:
timeout = None
get_job_timeout = PollingFuture._DEFAULT_VALUE
else:
get_job_timeout = timeout
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not do the kwargs trick here too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it's necessary - the purpose here is slightly different from when calling client.get_job(). I'm only preserving the sentinel value for the job.done() call, and use None as default for any other calls.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, following later comments


try:
retry_do_query = getattr(self, "_retry_do_query", None)
if retry_do_query is not None:
Expand Down Expand Up @@ -1548,7 +1562,7 @@ def is_job_done():
# rateLimitExceeded errors are ambiguous. We want to know if
# the query job failed and not just the call to
# jobs.getQueryResults.
if self.done(retry=retry, timeout=timeout):
if self.done(retry=retry, timeout=get_job_timeout):
# If it's already failed, we might as well stop.
job_failed_exception = self.exception()
if job_failed_exception is not None:
Expand Down
5 changes: 5 additions & 0 deletions google/cloud/bigquery/retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,3 +140,8 @@ def _job_should_retry(exc):
"""
The default job retry object.
"""

DEFAULT_GET_JOB_TIMEOUT = 128
"""
Default timeout for Client.get_job().
"""
53 changes: 38 additions & 15 deletions tests/unit/job/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from google.api_core import exceptions
import google.api_core.retry
from google.api_core.future import polling
from google.api_core.future.polling import PollingFuture
import pytest

from ..helpers import make_connection
Expand Down Expand Up @@ -709,7 +710,7 @@ def test_exists_w_timeout(self):
)

def test_reload_defaults(self):
from google.cloud.bigquery.retry import DEFAULT_RETRY
from google.cloud.bigquery.retry import DEFAULT_RETRY, DEFAULT_GET_JOB_TIMEOUT

resource = {
"jobReference": {
Expand All @@ -729,15 +730,19 @@ def test_reload_defaults(self):

call_api.assert_called_once_with(
DEFAULT_RETRY,
span_name="BigQuery.job.reload",
span_name="BigQuery.getJob",
span_attributes={
"path": "/projects/{}/jobs/{}".format(self.PROJECT, self.JOB_ID)
"path": "/projects/{}/jobs/{}".format(self.PROJECT, self.JOB_ID),
"job_id": "job-id",
"location": "us-central",
},
job_ref=job,
method="GET",
path="/projects/{}/jobs/{}".format(self.PROJECT, self.JOB_ID),
query_params={"location": self.LOCATION},
timeout=None,
query_params={
"projection": "full",
"location": "us-central",
},
timeout=DEFAULT_GET_JOB_TIMEOUT,
)
self.assertEqual(job._properties, expected)

Expand All @@ -762,16 +767,21 @@ def test_reload_explicit(self):
retry = DEFAULT_RETRY.with_deadline(1)
job.reload(client=client, retry=retry, timeout=4.2)

# Unsupported timeout type
with pytest.raises(ValueError):
job.reload(client=client, retry=retry, timeout="4.2")

call_api.assert_called_once_with(
retry,
span_name="BigQuery.job.reload",
span_name="BigQuery.getJob",
span_attributes={
"path": "/projects/{}/jobs/{}".format(self.PROJECT, self.JOB_ID)
"path": "/projects/{}/jobs/{}".format(self.PROJECT, self.JOB_ID),
"job_id": "job-id",
"location": None,
},
job_ref=job,
method="GET",
path="/projects/{}/jobs/{}".format(self.PROJECT, self.JOB_ID),
query_params={},
query_params={"projection": "full"},
timeout=4.2,
)
self.assertEqual(job._properties, expected)
Expand Down Expand Up @@ -952,7 +962,10 @@ def test_done_defaults_wo_state(self):

self.assertFalse(job.done())

reload_.assert_called_once_with(retry=DEFAULT_RETRY, timeout=None)
reload_.assert_called_once_with(
retry=DEFAULT_RETRY,
timeout=PollingFuture._DEFAULT_VALUE,
)

def test_done_explicit_wo_state(self):
from google.cloud.bigquery.retry import DEFAULT_RETRY
Expand All @@ -974,6 +987,8 @@ def test_done_already(self):
self.assertTrue(job.done())

def test_result_default_wo_state(self):
from google.cloud.bigquery.retry import DEFAULT_GET_JOB_TIMEOUT

begun_job_resource = _make_job_resource(
job_id=self.JOB_ID, project_id=self.PROJECT, location="US", started=True
)
Expand Down Expand Up @@ -1003,12 +1018,17 @@ def test_result_default_wo_state(self):
reload_call = mock.call(
method="GET",
path=f"/projects/{self.PROJECT}/jobs/{self.JOB_ID}",
query_params={"location": "US"},
timeout=None,
query_params={
"projection": "full",
"location": "US",
},
timeout=DEFAULT_GET_JOB_TIMEOUT,
)
conn.api_request.assert_has_calls([begin_call, begin_call, reload_call])

def test_result_w_retry_wo_state(self):
from google.cloud.bigquery.retry import DEFAULT_GET_JOB_TIMEOUT

begun_job_resource = _make_job_resource(
job_id=self.JOB_ID, project_id=self.PROJECT, location="EU", started=True
)
Expand Down Expand Up @@ -1054,8 +1074,11 @@ def test_result_w_retry_wo_state(self):
reload_call = mock.call(
method="GET",
path=f"/projects/{self.PROJECT}/jobs/{self.JOB_ID}",
query_params={"location": "EU"},
timeout=None,
query_params={
"projection": "full",
"location": "EU",
},
timeout=DEFAULT_GET_JOB_TIMEOUT,
)
conn.api_request.assert_has_calls(
[begin_call, begin_call, reload_call, reload_call]
Expand Down
34 changes: 30 additions & 4 deletions tests/unit/job/test_copy.py
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,8 @@ def test_exists_hit_w_alternate_client(self):
)

def test_reload_w_bound_client(self):
from google.cloud.bigquery.retry import DEFAULT_GET_JOB_TIMEOUT

PATH = "/projects/%s/jobs/%s" % (self.PROJECT, self.JOB_ID)
RESOURCE = self._make_resource()
conn = make_connection(RESOURCE)
Expand All @@ -489,14 +491,27 @@ def test_reload_w_bound_client(self):
) as final_attributes:
job.reload()

final_attributes.assert_called_with({"path": PATH}, client, job)
final_attributes.assert_called_with(
{
"path": PATH,
"job_id": self.JOB_ID,
"location": None,
},
client,
None,
)

conn.api_request.assert_called_once_with(
method="GET", path=PATH, query_params={}, timeout=None
method="GET",
path=PATH,
query_params={"projection": "full"},
timeout=DEFAULT_GET_JOB_TIMEOUT,
)
self._verifyResourceProperties(job, RESOURCE)

def test_reload_w_alternate_client(self):
from google.cloud.bigquery.retry import DEFAULT_GET_JOB_TIMEOUT

PATH = "/projects/%s/jobs/%s" % (self.PROJECT, self.JOB_ID)
RESOURCE = self._make_resource()
conn1 = make_connection()
Expand All @@ -511,10 +526,21 @@ def test_reload_w_alternate_client(self):
) as final_attributes:
job.reload(client=client2)

final_attributes.assert_called_with({"path": PATH}, client2, job)
final_attributes.assert_called_with(
{
"path": PATH,
"job_id": self.JOB_ID,
"location": None,
},
client2,
None,
)

conn1.api_request.assert_not_called()
conn2.api_request.assert_called_once_with(
method="GET", path=PATH, query_params={}, timeout=None
method="GET",
path=PATH,
query_params={"projection": "full"},
timeout=DEFAULT_GET_JOB_TIMEOUT,
)
self._verifyResourceProperties(job, RESOURCE)
Loading