Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add configuration option to read_gbq #401

Merged
merged 14 commits into from
Mar 22, 2024
8 changes: 6 additions & 2 deletions bigframes/pandas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -491,9 +491,10 @@ def read_gbq(
*,
index_col: Iterable[str] | str = (),
columns: Iterable[str] = (),
configuration: Optional[dict] = None,
max_results: Optional[int] = None,
filters: vendored_pandas_gbq.FiltersType = (),
use_cache: bool = True,
use_cache: Optional[bool] = None,
col_order: Iterable[str] = (),
) -> bigframes.dataframe.DataFrame:
_set_default_session_location_if_possible(query_or_table)
Expand All @@ -502,6 +503,7 @@ def read_gbq(
query_or_table,
index_col=index_col,
columns=columns,
configuration=configuration,
max_results=max_results,
filters=filters,
use_cache=use_cache,
Expand All @@ -527,8 +529,9 @@ def read_gbq_query(
*,
index_col: Iterable[str] | str = (),
columns: Iterable[str] = (),
configuration: Optional[dict] = None,
max_results: Optional[int] = None,
use_cache: bool = True,
use_cache: Optional[bool] = None,
col_order: Iterable[str] = (),
) -> bigframes.dataframe.DataFrame:
_set_default_session_location_if_possible(query)
Expand All @@ -537,6 +540,7 @@ def read_gbq_query(
query,
index_col=index_col,
columns=columns,
configuration=configuration,
max_results=max_results,
use_cache=use_cache,
col_order=col_order,
Expand Down
95 changes: 83 additions & 12 deletions bigframes/session/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

from __future__ import annotations

import copy
import datetime
import itertools
import logging
Expand Down Expand Up @@ -255,9 +256,10 @@ def read_gbq(
*,
index_col: Iterable[str] | str = (),
columns: Iterable[str] = (),
configuration: Optional[dict] = None,
max_results: Optional[int] = None,
filters: third_party_pandas_gbq.FiltersType = (),
use_cache: bool = True,
use_cache: Optional[bool] = None,
col_order: Iterable[str] = (),
# Add a verify index argument that fails if the index is not unique.
) -> dataframe.DataFrame:
Expand All @@ -278,6 +280,7 @@ def read_gbq(
query_or_table,
index_col=index_col,
columns=columns,
configuration=configuration,
max_results=max_results,
api_name="read_gbq",
use_cache=use_cache,
Expand All @@ -286,13 +289,20 @@ def read_gbq(
# TODO(swast): Query the snapshot table but mark it as a
# deterministic query so we can avoid serializing if we have a
# unique index.
if configuration is not None:
raise ValueError(
"The 'configuration' argument is not allowed when "
"directly reading from a table. Please remove "
"'configuration' or use a query."
)

return self._read_gbq_table(
query_or_table,
index_col=index_col,
columns=columns,
max_results=max_results,
api_name="read_gbq",
use_cache=use_cache,
use_cache=use_cache if use_cache is not None else True,
)

def _to_query(
Expand Down Expand Up @@ -376,7 +386,7 @@ def _query_to_destination(
query: str,
index_cols: List[str],
api_name: str,
use_cache: bool = True,
configuration: dict = {"query": {"useQueryCache": True}},
shobsi marked this conversation as resolved.
Show resolved Hide resolved
) -> Tuple[Optional[bigquery.TableReference], Optional[bigquery.QueryJob]]:
# If a dry_run indicates this is not a query type job, then don't
# bother trying to do a CREATE TEMP TABLE ... AS SELECT ... statement.
Expand All @@ -398,23 +408,35 @@ def _query_to_destination(
][:_MAX_CLUSTER_COLUMNS]
temp_table = self._create_empty_temp_table(schema, cluster_cols)

job_config = bigquery.QueryJobConfig()
timeout_ms = configuration.get("jobTimeoutMs") or configuration["query"].get(
"timeoutMs"
)

# Convert timeout_ms to seconds, ensuring a minimum of 0.1 seconds to avoid
# the program getting stuck on too-short timeouts.
timeout = max(int(timeout_ms) * 1e-3, 0.1) if timeout_ms else None

job_config = typing.cast(
bigquery.QueryJobConfig,
bigquery.QueryJobConfig.from_api_repr(configuration),
)
job_config.labels["bigframes-api"] = api_name
job_config.destination = temp_table
job_config.use_query_cache = use_cache

try:
# Write to temp table to workaround BigQuery 10 GB query results
# limit. See: internal issue 303057336.
job_config.labels["error_caught"] = "true"
_, query_job = self._start_query(query, job_config=job_config)
_, query_job = self._start_query(
query, job_config=job_config, timeout=timeout
)
return query_job.destination, query_job
except google.api_core.exceptions.BadRequest:
# Some SELECT statements still aren't compatible with cluster
# tables as the destination. For example, if the query has a
# top-level ORDER BY, this conflicts with our ability to cluster
# the table by the index column(s).
_, query_job = self._start_query(query)
_, query_job = self._start_query(query, timeout=timeout)
return query_job.destination, query_job

def read_gbq_query(
Expand All @@ -423,8 +445,9 @@ def read_gbq_query(
*,
index_col: Iterable[str] | str = (),
columns: Iterable[str] = (),
configuration: Optional[dict] = None,
max_results: Optional[int] = None,
use_cache: bool = True,
use_cache: Optional[bool] = None,
col_order: Iterable[str] = (),
) -> dataframe.DataFrame:
"""Turn a SQL query into a DataFrame.
Expand Down Expand Up @@ -488,6 +511,7 @@ def read_gbq_query(
query=query,
index_col=index_col,
columns=columns,
configuration=configuration,
max_results=max_results,
api_name="read_gbq_query",
use_cache=use_cache,
Expand All @@ -499,10 +523,34 @@ def _read_gbq_query(
*,
index_col: Iterable[str] | str = (),
columns: Iterable[str] = (),
configuration: Optional[dict] = None,
max_results: Optional[int] = None,
api_name: str = "read_gbq_query",
use_cache: bool = True,
use_cache: Optional[bool] = None,
) -> dataframe.DataFrame:
configuration = _transform_read_gbq_configuration(configuration)

if "query" not in configuration:
configuration["query"] = {}

if "query" in configuration["query"]:
raise ValueError(
"The query statement must not be included in the ",
"'configuration' because it is already provided as",
" a separate parameter.",
)

if "useQueryCache" in configuration["query"]:
if use_cache is not None:
raise ValueError(
"'useQueryCache' in 'configuration' conflicts with"
" 'use_cache' parameter. Please specify only one."
)
else:
configuration["query"]["useQueryCache"] = (
True if use_cache is None else use_cache
)

if isinstance(index_col, str):
index_cols = [index_col]
else:
Expand All @@ -512,7 +560,7 @@ def _read_gbq_query(
query,
index_cols,
api_name=api_name,
use_cache=use_cache,
configuration=configuration,
)

# If there was no destination table, that means the query must have
Expand All @@ -536,7 +584,7 @@ def _read_gbq_query(
index_col=index_cols,
columns=columns,
max_results=max_results,
use_cache=use_cache,
use_cache=configuration["query"]["useQueryCache"],
)

def read_gbq_table(
Expand Down Expand Up @@ -1601,13 +1649,14 @@ def _start_query(
sql: str,
job_config: Optional[bigquery.job.QueryJobConfig] = None,
max_results: Optional[int] = None,
timeout: Optional[float] = None,
) -> Tuple[bigquery.table.RowIterator, bigquery.QueryJob]:
"""
Starts BigQuery query job and waits for results.
"""
job_config = self._prepare_query_job_config(job_config)
return bigframes.session._io.bigquery.start_query_with_client(
self.bqclient, sql, job_config, max_results
self.bqclient, sql, job_config, max_results, timeout
)

def _start_query_ml_ddl(
Expand Down Expand Up @@ -1821,3 +1870,25 @@ def _convert_to_nonnull_string(column: ibis_types.Column) -> ibis_types.StringVa
# Escape backslashes and use backslash as delineator
escaped = typing.cast(ibis_types.StringColumn, result.fillna("")).replace("\\", "\\\\") # type: ignore
return typing.cast(ibis_types.StringColumn, ibis.literal("\\")).concat(escaped)


def _transform_read_gbq_configuration(configuration: Optional[dict]) -> dict:
"""
For backwards-compatibility, convert any previously client-side only
parameters such as timeoutMs to the property name expected by the REST API.

Makes a copy of configuration if changes are needed.
"""

if configuration is None:
return {}

timeout_ms = configuration.get("query", {}).get("timeoutMs")
if timeout_ms is not None:
# Transform timeoutMs to an actual server-side configuration.
# https://github.com/googleapis/python-bigquery-pandas/issues/479
configuration = copy.deepcopy(configuration)
del configuration["query"]["timeoutMs"]
configuration["jobTimeoutMs"] = timeout_ms

return configuration
3 changes: 2 additions & 1 deletion bigframes/session/_io/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ def start_query_with_client(
sql: str,
job_config: bigquery.job.QueryJobConfig,
max_results: Optional[int] = None,
timeout: Optional[float] = None,
) -> Tuple[bigquery.table.RowIterator, bigquery.QueryJob]:
"""
Starts query job and waits for results.
Expand All @@ -230,7 +231,7 @@ def start_query_with_client(
)

try:
query_job = bq_client.query(sql, job_config=job_config)
query_job = bq_client.query(sql, job_config=job_config, timeout=timeout)
except google.api_core.exceptions.Forbidden as ex:
if "Drive credentials" in ex.message:
ex.message += "\nCheck https://cloud.google.com/bigquery/docs/query-drive-data#Google_Drive_permissions."
Expand Down
41 changes: 41 additions & 0 deletions tests/system/small/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import typing
from typing import List

import google
from google.api_core.exceptions import InternalServerError
shobsi marked this conversation as resolved.
Show resolved Hide resolved
import google.cloud.bigquery as bigquery
import numpy as np
import pandas as pd
Expand Down Expand Up @@ -353,6 +355,45 @@ def test_read_gbq_table_wildcard_with_filter(session: bigframes.Session):
assert df.shape == (348485, 32)


@pytest.mark.parametrize(
("config"),
[
{
"query": {
"useQueryCache": True,
"maximumBytesBilled": "1000000000",
"timeoutMs": 10000,
}
},
pytest.param(
{"query": {"useQueryCache": True, "timeoutMs": 50}},
marks=pytest.mark.xfail(
raises=google.api_core.exceptions.BadRequest,
shobsi marked this conversation as resolved.
Show resolved Hide resolved
),
),
pytest.param(
{"query": {"useQueryCache": False, "maximumBytesBilled": "100"}},
marks=pytest.mark.xfail(
raises=InternalServerError,
shobsi marked this conversation as resolved.
Show resolved Hide resolved
),
),
],
)
def test_read_gbq_with_configuration(
session: bigframes.Session, scalars_table_id: str, config: dict
):
query = f"""SELECT
t.float64_col * 2 AS my_floats,
CONCAT(t.string_col, "_2") AS my_strings,
t.int64_col > 0 AS my_bools,
FROM `{scalars_table_id}` AS t
"""

df = session.read_gbq(query, configuration=config)

assert df.shape == (9, 3)


def test_read_gbq_model(session, penguins_linear_model_name):
model = session.read_gbq_model(penguins_linear_model_name)
assert isinstance(model, bigframes.ml.linear_model.LinearRegression)
Expand Down
14 changes: 11 additions & 3 deletions third_party/bigframes_vendored/pandas/io/gbq.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ def read_gbq(
*,
index_col: Iterable[str] | str = (),
columns: Iterable[str] = (),
configuration: Optional[dict] = None,
max_results: Optional[int] = None,
filters: FiltersType = (),
use_cache: bool = True,
use_cache: Optional[bool] = None,
col_order: Iterable[str] = (),
):
"""Loads a DataFrame from BigQuery.
Expand Down Expand Up @@ -107,6 +108,11 @@ def read_gbq(
columns (Iterable[str]):
List of BigQuery column names in the desired order for results
DataFrame.
configuration (dict, optional):
Query config parameters for job processing.
For example: configuration = {'query': {'useQueryCache': False}}.
For more information see `BigQuery REST API Reference
<https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.query>`__.
max_results (Optional[int], default None):
If set, limit the maximum number of rows to fetch from the
query results.
Expand All @@ -121,8 +127,10 @@ def read_gbq(
If using wildcard table suffix in query_or_table, can specify
'_table_suffix' pseudo column to filter the tables to be read
into the DataFrame.
use_cache (bool, default True):
Whether to cache the query inputs. Default to True.
use_cache (Optional[bool], default None):
Caches query results if set to `True`. When `None`, it behaves
as `True`, but should not be combined with `useQueryCache` in
`configuration` to avoid conflicts.
col_order (Iterable[str]):
Alias for columns, retained for backwards compatibility.

Expand Down