Skip to content

Commit

Permalink
feat: add configuration option to read_gbq (#401)
Browse files Browse the repository at this point in the history
Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly:
- [ ] Make sure to open an issue as a [bug/issue](https://togithub.com/googleapis/python-bigquery-dataframes/issues/new/choose) before writing your code!  That way we can discuss the change, evaluate designs, and agree on the general idea
- [ ] Ensure the tests and linter pass
- [ ] Code coverage does not decrease (if any source code was changed)
- [ ] Appropriate docs were updated (if necessary)

Fixes https://togithub.com/googleapis/python-bigquery-dataframes/issues/384 🦕
  • Loading branch information
Genesis929 authored Mar 22, 2024
1 parent ad0e99e commit 85cede2
Show file tree
Hide file tree
Showing 5 changed files with 145 additions and 19 deletions.
8 changes: 6 additions & 2 deletions bigframes/pandas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -492,9 +492,10 @@ def read_gbq(
*,
index_col: Iterable[str] | str = (),
columns: Iterable[str] = (),
configuration: Optional[Dict] = None,
max_results: Optional[int] = None,
filters: vendored_pandas_gbq.FiltersType = (),
use_cache: bool = True,
use_cache: Optional[bool] = None,
col_order: Iterable[str] = (),
) -> bigframes.dataframe.DataFrame:
_set_default_session_location_if_possible(query_or_table)
Expand All @@ -503,6 +504,7 @@ def read_gbq(
query_or_table,
index_col=index_col,
columns=columns,
configuration=configuration,
max_results=max_results,
filters=filters,
use_cache=use_cache,
Expand All @@ -528,8 +530,9 @@ def read_gbq_query(
*,
index_col: Iterable[str] | str = (),
columns: Iterable[str] = (),
configuration: Optional[Dict] = None,
max_results: Optional[int] = None,
use_cache: bool = True,
use_cache: Optional[bool] = None,
col_order: Iterable[str] = (),
) -> bigframes.dataframe.DataFrame:
_set_default_session_location_if_possible(query)
Expand All @@ -538,6 +541,7 @@ def read_gbq_query(
query,
index_col=index_col,
columns=columns,
configuration=configuration,
max_results=max_results,
use_cache=use_cache,
col_order=col_order,
Expand Down
95 changes: 83 additions & 12 deletions bigframes/session/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

from __future__ import annotations

import copy
import datetime
import itertools
import logging
Expand Down Expand Up @@ -283,9 +284,10 @@ def read_gbq(
*,
index_col: Iterable[str] | str = (),
columns: Iterable[str] = (),
configuration: Optional[Dict] = None,
max_results: Optional[int] = None,
filters: third_party_pandas_gbq.FiltersType = (),
use_cache: bool = True,
use_cache: Optional[bool] = None,
col_order: Iterable[str] = (),
# Add a verify index argument that fails if the index is not unique.
) -> dataframe.DataFrame:
Expand All @@ -306,6 +308,7 @@ def read_gbq(
query_or_table,
index_col=index_col,
columns=columns,
configuration=configuration,
max_results=max_results,
api_name="read_gbq",
use_cache=use_cache,
Expand All @@ -314,13 +317,20 @@ def read_gbq(
# TODO(swast): Query the snapshot table but mark it as a
# deterministic query so we can avoid serializing if we have a
# unique index.
if configuration is not None:
raise ValueError(
"The 'configuration' argument is not allowed when "
"directly reading from a table. Please remove "
"'configuration' or use a query."
)

return self._read_gbq_table(
query_or_table,
index_col=index_col,
columns=columns,
max_results=max_results,
api_name="read_gbq",
use_cache=use_cache,
use_cache=use_cache if use_cache is not None else True,
)

def _to_query(
Expand Down Expand Up @@ -405,7 +415,7 @@ def _query_to_destination(
query: str,
index_cols: List[str],
api_name: str,
use_cache: bool = True,
configuration: dict = {"query": {"useQueryCache": True}},
) -> Tuple[Optional[bigquery.TableReference], Optional[bigquery.QueryJob]]:
# If a dry_run indicates this is not a query type job, then don't
# bother trying to do a CREATE TEMP TABLE ... AS SELECT ... statement.
Expand All @@ -427,23 +437,35 @@ def _query_to_destination(
][:_MAX_CLUSTER_COLUMNS]
temp_table = self._create_empty_temp_table(schema, cluster_cols)

job_config = bigquery.QueryJobConfig()
timeout_ms = configuration.get("jobTimeoutMs") or configuration["query"].get(
"timeoutMs"
)

# Convert timeout_ms to seconds, ensuring a minimum of 0.1 seconds to avoid
# the program getting stuck on too-short timeouts.
timeout = max(int(timeout_ms) * 1e-3, 0.1) if timeout_ms else None

job_config = typing.cast(
bigquery.QueryJobConfig,
bigquery.QueryJobConfig.from_api_repr(configuration),
)
job_config.labels["bigframes-api"] = api_name
job_config.destination = temp_table
job_config.use_query_cache = use_cache

try:
# Write to temp table to workaround BigQuery 10 GB query results
# limit. See: internal issue 303057336.
job_config.labels["error_caught"] = "true"
_, query_job = self._start_query(query, job_config=job_config)
_, query_job = self._start_query(
query, job_config=job_config, timeout=timeout
)
return query_job.destination, query_job
except google.api_core.exceptions.BadRequest:
# Some SELECT statements still aren't compatible with cluster
# tables as the destination. For example, if the query has a
# top-level ORDER BY, this conflicts with our ability to cluster
# the table by the index column(s).
_, query_job = self._start_query(query)
_, query_job = self._start_query(query, timeout=timeout)
return query_job.destination, query_job

def read_gbq_query(
Expand All @@ -452,8 +474,9 @@ def read_gbq_query(
*,
index_col: Iterable[str] | str = (),
columns: Iterable[str] = (),
configuration: Optional[Dict] = None,
max_results: Optional[int] = None,
use_cache: bool = True,
use_cache: Optional[bool] = None,
col_order: Iterable[str] = (),
) -> dataframe.DataFrame:
"""Turn a SQL query into a DataFrame.
Expand Down Expand Up @@ -517,6 +540,7 @@ def read_gbq_query(
query=query,
index_col=index_col,
columns=columns,
configuration=configuration,
max_results=max_results,
api_name="read_gbq_query",
use_cache=use_cache,
Expand All @@ -528,10 +552,34 @@ def _read_gbq_query(
*,
index_col: Iterable[str] | str = (),
columns: Iterable[str] = (),
configuration: Optional[Dict] = None,
max_results: Optional[int] = None,
api_name: str = "read_gbq_query",
use_cache: bool = True,
use_cache: Optional[bool] = None,
) -> dataframe.DataFrame:
configuration = _transform_read_gbq_configuration(configuration)

if "query" not in configuration:
configuration["query"] = {}

if "query" in configuration["query"]:
raise ValueError(
"The query statement must not be included in the ",
"'configuration' because it is already provided as",
" a separate parameter.",
)

if "useQueryCache" in configuration["query"]:
if use_cache is not None:
raise ValueError(
"'useQueryCache' in 'configuration' conflicts with"
" 'use_cache' parameter. Please specify only one."
)
else:
configuration["query"]["useQueryCache"] = (
True if use_cache is None else use_cache
)

if isinstance(index_col, str):
index_cols = [index_col]
else:
Expand All @@ -541,7 +589,7 @@ def _read_gbq_query(
query,
index_cols,
api_name=api_name,
use_cache=use_cache,
configuration=configuration,
)

# If there was no destination table, that means the query must have
Expand All @@ -565,7 +613,7 @@ def _read_gbq_query(
index_col=index_cols,
columns=columns,
max_results=max_results,
use_cache=use_cache,
use_cache=configuration["query"]["useQueryCache"],
)

def read_gbq_table(
Expand Down Expand Up @@ -1656,13 +1704,14 @@ def _start_query(
sql: str,
job_config: Optional[bigquery.job.QueryJobConfig] = None,
max_results: Optional[int] = None,
timeout: Optional[float] = None,
) -> Tuple[bigquery.table.RowIterator, bigquery.QueryJob]:
"""
Starts BigQuery query job and waits for results.
"""
job_config = self._prepare_query_job_config(job_config)
return bigframes.session._io.bigquery.start_query_with_client(
self.bqclient, sql, job_config, max_results
self.bqclient, sql, job_config, max_results, timeout
)

def _start_query_ml_ddl(
Expand Down Expand Up @@ -1876,3 +1925,25 @@ def _convert_to_nonnull_string(column: ibis_types.Column) -> ibis_types.StringVa
# Escape backslashes and use backslash as delineator
escaped = typing.cast(ibis_types.StringColumn, result.fillna("")).replace("\\", "\\\\") # type: ignore
return typing.cast(ibis_types.StringColumn, ibis.literal("\\")).concat(escaped)


def _transform_read_gbq_configuration(configuration: Optional[dict]) -> dict:
"""
For backwards-compatibility, convert any previously client-side only
parameters such as timeoutMs to the property name expected by the REST API.
Makes a copy of configuration if changes are needed.
"""

if configuration is None:
return {}

timeout_ms = configuration.get("query", {}).get("timeoutMs")
if timeout_ms is not None:
# Transform timeoutMs to an actual server-side configuration.
# https://github.com/googleapis/python-bigquery-pandas/issues/479
configuration = copy.deepcopy(configuration)
del configuration["query"]["timeoutMs"]
configuration["jobTimeoutMs"] = timeout_ms

return configuration
3 changes: 2 additions & 1 deletion bigframes/session/_io/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ def start_query_with_client(
sql: str,
job_config: bigquery.job.QueryJobConfig,
max_results: Optional[int] = None,
timeout: Optional[float] = None,
) -> Tuple[bigquery.table.RowIterator, bigquery.QueryJob]:
"""
Starts query job and waits for results.
Expand All @@ -230,7 +231,7 @@ def start_query_with_client(
)

try:
query_job = bq_client.query(sql, job_config=job_config)
query_job = bq_client.query(sql, job_config=job_config, timeout=timeout)
except google.api_core.exceptions.Forbidden as ex:
if "Drive credentials" in ex.message:
ex.message += "\nCheck https://cloud.google.com/bigquery/docs/query-drive-data#Google_Drive_permissions."
Expand Down
42 changes: 42 additions & 0 deletions tests/system/small/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import typing
from typing import List

import google
import google.cloud.bigquery as bigquery
import numpy as np
import pandas as pd
Expand Down Expand Up @@ -363,6 +364,47 @@ def test_read_gbq_table_wildcard_with_filter(session: bigframes.Session):
assert df.shape == (348485, 32)


@pytest.mark.parametrize(
("config"),
[
{
"query": {
"useQueryCache": True,
"maximumBytesBilled": "1000000000",
"timeoutMs": 10000,
}
},
pytest.param(
{"query": {"useQueryCache": True, "timeoutMs": 50}},
marks=pytest.mark.xfail(
raises=google.api_core.exceptions.BadRequest,
reason="Expected failure due to timeout being set too short.",
),
),
pytest.param(
{"query": {"useQueryCache": False, "maximumBytesBilled": "100"}},
marks=pytest.mark.xfail(
raises=google.api_core.exceptions.InternalServerError,
reason="Expected failure when the query exceeds the maximum bytes billed limit.",
),
),
],
)
def test_read_gbq_with_configuration(
session: bigframes.Session, scalars_table_id: str, config: dict
):
query = f"""SELECT
t.float64_col * 2 AS my_floats,
CONCAT(t.string_col, "_2") AS my_strings,
t.int64_col > 0 AS my_bools,
FROM `{scalars_table_id}` AS t
"""

df = session.read_gbq(query, configuration=config)

assert df.shape == (9, 3)


def test_read_gbq_model(session, penguins_linear_model_name):
model = session.read_gbq_model(penguins_linear_model_name)
assert isinstance(model, bigframes.ml.linear_model.LinearRegression)
Expand Down
16 changes: 12 additions & 4 deletions third_party/bigframes_vendored/pandas/io/gbq.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from __future__ import annotations

from typing import Any, Iterable, Literal, Optional, Tuple, Union
from typing import Any, Dict, Iterable, Literal, Optional, Tuple, Union

from bigframes import constants

Expand All @@ -19,9 +19,10 @@ def read_gbq(
*,
index_col: Iterable[str] | str = (),
columns: Iterable[str] = (),
configuration: Optional[Dict] = None,
max_results: Optional[int] = None,
filters: FiltersType = (),
use_cache: bool = True,
use_cache: Optional[bool] = None,
col_order: Iterable[str] = (),
):
"""Loads a DataFrame from BigQuery.
Expand Down Expand Up @@ -107,6 +108,11 @@ def read_gbq(
columns (Iterable[str]):
List of BigQuery column names in the desired order for results
DataFrame.
configuration (dict, optional):
Query config parameters for job processing.
For example: configuration = {'query': {'useQueryCache': False}}.
For more information see `BigQuery REST API Reference
<https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.query>`__.
max_results (Optional[int], default None):
If set, limit the maximum number of rows to fetch from the
query results.
Expand All @@ -121,8 +127,10 @@ def read_gbq(
If using wildcard table suffix in query_or_table, can specify
'_table_suffix' pseudo column to filter the tables to be read
into the DataFrame.
use_cache (bool, default True):
Whether to cache the query inputs. Default to True.
use_cache (Optional[bool], default None):
Caches query results if set to `True`. When `None`, it behaves
as `True`, but should not be combined with `useQueryCache` in
`configuration` to avoid conflicts.
col_order (Iterable[str]):
Alias for columns, retained for backwards compatibility.
Expand Down

0 comments on commit 85cede2

Please sign in to comment.