Skip to content

Commit

Permalink
fix: use an allowlist instead of denylist to determine when `query_an…
Browse files Browse the repository at this point in the history
…d_wait` uses `jobs.query` API (#1869)
  • Loading branch information
tswast authored Mar 27, 2024
1 parent b0e95a0 commit e265db6
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 19 deletions.
53 changes: 36 additions & 17 deletions google/cloud/bigquery/_job_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -400,9 +400,13 @@ def query_and_wait(
:class:`~google.cloud.bigquery.job.QueryJobConfig`
class.
"""
request_body = _to_query_request(
query=query, job_config=job_config, location=location, timeout=api_timeout
)

# Some API parameters aren't supported by the jobs.query API. In these
# cases, fallback to a jobs.insert call.
if not _supported_by_jobs_query(job_config):
if not _supported_by_jobs_query(request_body):
return _wait_or_cancel(
query_jobs_insert(
client=client,
Expand All @@ -424,9 +428,6 @@ def query_and_wait(
)

path = _to_query_path(project)
request_body = _to_query_request(
query=query, job_config=job_config, location=location, timeout=api_timeout
)

if page_size is not None and max_results is not None:
request_body["maxResults"] = min(page_size, max_results)
Expand Down Expand Up @@ -506,20 +507,38 @@ def do_query():
return do_query()


def _supported_by_jobs_query(job_config: Optional[job.QueryJobConfig]) -> bool:
def _supported_by_jobs_query(request_body: Dict[str, Any]) -> bool:
"""True if jobs.query can be used. False if jobs.insert is needed."""
if job_config is None:
return True

return (
# These features aren't supported by jobs.query.
job_config.clustering_fields is None
and job_config.destination is None
and job_config.destination_encryption_configuration is None
and job_config.range_partitioning is None
and job_config.table_definitions is None
and job_config.time_partitioning is None
)
request_keys = frozenset(request_body.keys())

# Per issue: https://github.com/googleapis/python-bigquery/issues/1867
# use an allowlist here instead of a denylist because the backend API allows
# unsupported parameters without any warning or failure. Instead, keep this
# set in sync with those in QueryRequest:
# https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/query#QueryRequest
keys_allowlist = {
"kind",
"query",
"maxResults",
"defaultDataset",
"timeoutMs",
"dryRun",
"preserveNulls",
"useQueryCache",
"useLegacySql",
"parameterMode",
"queryParameters",
"location",
"formatOptions",
"connectionProperties",
"labels",
"maximumBytesBilled",
"requestId",
"createSession",
}

unsupported_keys = request_keys - keys_allowlist
return len(unsupported_keys) == 0


def _wait_or_cancel(
Expand Down
15 changes: 13 additions & 2 deletions tests/unit/test__job_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import pytest

from google.cloud.bigquery.client import Client
from google.cloud.bigquery import enums
from google.cloud.bigquery import _job_helpers
from google.cloud.bigquery.job import copy_ as job_copy
from google.cloud.bigquery.job import extract as job_extract
Expand Down Expand Up @@ -1141,12 +1142,22 @@ def test_make_job_id_w_job_id_overrides_prefix():
False,
id="destination_encryption_configuration",
),
# priority="BATCH" is not supported. See:
# https://github.com/googleapis/python-bigquery/issues/1867
pytest.param(
job_query.QueryJobConfig(
priority=enums.QueryPriority.BATCH,
),
False,
id="priority=BATCH",
),
),
)
def test_supported_by_jobs_query(
def test_supported_by_jobs_query_from_queryjobconfig(
job_config: Optional[job_query.QueryJobConfig], expected: bool
):
assert _job_helpers._supported_by_jobs_query(job_config) == expected
request_body = _job_helpers._to_query_request(job_config, query="SELECT 1")
assert _job_helpers._supported_by_jobs_query(request_body) == expected


def test_wait_or_cancel_no_exception():
Expand Down

0 comments on commit e265db6

Please sign in to comment.