Skip to content

Commit

Permalink
feat: support read_gbq wildcard table path (#377)
Browse files Browse the repository at this point in the history
Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly:
- [ ] Make sure to open an issue as a [bug/issue](https://togithub.com/googleapis/python-bigquery-dataframes/issues/new/choose) before writing your code!  That way we can discuss the change, evaluate designs, and agree on the general idea
- [ ] Ensure the tests and linter pass
- [ ] Code coverage does not decrease (if any source code was changed)
- [ ] Appropriate docs were updated (if necessary)

Fixes #<issue_number_goes_here> 🦕
  • Loading branch information
GarrettWu authored Feb 13, 2024
1 parent 208e081 commit 90caf86
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 26 deletions.
2 changes: 2 additions & 0 deletions bigframes/pandas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -551,6 +551,7 @@ def read_gbq_table(
index_col: Iterable[str] | str = (),
columns: Iterable[str] = (),
max_results: Optional[int] = None,
filters: vendored_pandas_gbq.FiltersType = (),
use_cache: bool = True,
col_order: Iterable[str] = (),
) -> bigframes.dataframe.DataFrame:
Expand All @@ -561,6 +562,7 @@ def read_gbq_table(
index_col=index_col,
columns=columns,
max_results=max_results,
filters=filters,
use_cache=use_cache,
col_order=col_order,
)
Expand Down
64 changes: 44 additions & 20 deletions bigframes/session/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
Iterable,
List,
Literal,
Mapping,
MutableSequence,
Optional,
Sequence,
Expand Down Expand Up @@ -115,6 +116,11 @@ def _is_query(query_or_table: str) -> bool:
return re.search(r"\s", query_or_table.strip(), re.MULTILINE) is not None


def _is_table_with_wildcard_suffix(query_or_table: str) -> bool:
"""Determine if `query_or_table` is a table and contains a wildcard suffix."""
return not _is_query(query_or_table) and query_or_table.endswith("*")


class Session(
third_party_pandas_gbq.GBQIOMixin,
third_party_pandas_parquet.ParquetIOMixin,
Expand Down Expand Up @@ -248,7 +254,9 @@ def read_gbq(
elif col_order:
columns = col_order

query_or_table = self._filters_to_query(query_or_table, columns, filters)
filters = list(filters)
if len(filters) != 0 or _is_table_with_wildcard_suffix(query_or_table):
query_or_table = self._to_query(query_or_table, columns, filters)

if _is_query(query_or_table):
return self._read_gbq_query(
Expand All @@ -272,13 +280,18 @@ def read_gbq(
use_cache=use_cache,
)

def _filters_to_query(self, query_or_table, columns, filters):
"""Convert filters to query"""
if len(filters) == 0:
return query_or_table

def _to_query(
self,
query_or_table: str,
columns: Iterable[str],
filters: third_party_pandas_gbq.FiltersType,
) -> str:
"""Compile query_or_table with conditions(filters, wildcards) to query."""
filters = list(filters)
sub_query = (
f"({query_or_table})" if _is_query(query_or_table) else query_or_table
f"({query_or_table})"
if _is_query(query_or_table)
else f"`{query_or_table}`"
)

select_clause = "SELECT " + (
Expand All @@ -287,7 +300,7 @@ def _filters_to_query(self, query_or_table, columns, filters):

where_clause = ""
if filters:
valid_operators = {
valid_operators: Mapping[third_party_pandas_gbq.FilterOps, str] = {
"in": "IN",
"not in": "NOT IN",
"==": "=",
Expand All @@ -298,19 +311,16 @@ def _filters_to_query(self, query_or_table, columns, filters):
"!=": "!=",
}

if (
isinstance(filters, Iterable)
and isinstance(filters[0], Tuple)
and (len(filters[0]) == 0 or not isinstance(filters[0][0], Tuple))
# If single layer filter, add another pseudo layer. So the single layer represents "and" logic.
if isinstance(filters[0], tuple) and (
len(filters[0]) == 0 or not isinstance(list(filters[0])[0], tuple)
):
filters = [filters]
filters = typing.cast(third_party_pandas_gbq.FiltersType, [filters])

or_expressions = []
for group in filters:
if not isinstance(group, Iterable):
raise ValueError(
f"Filter group should be a iterable, {group} is not valid."
)
group = [group]

and_expressions = []
for filter_item in group:
Expand All @@ -329,13 +339,13 @@ def _filters_to_query(self, query_or_table, columns, filters):
if operator not in valid_operators:
raise ValueError(f"Operator {operator} is not valid.")

operator = valid_operators[operator]
operator_str = valid_operators[operator]

if operator in ["IN", "NOT IN"]:
if operator_str in ["IN", "NOT IN"]:
value_list = ", ".join([repr(v) for v in value])
expression = f"`{column}` {operator} ({value_list})"
expression = f"`{column}` {operator_str} ({value_list})"
else:
expression = f"`{column}` {operator} {repr(value)}"
expression = f"`{column}` {operator_str} {repr(value)}"
and_expressions.append(expression)

or_expressions.append(" AND ".join(and_expressions))
Expand Down Expand Up @@ -521,6 +531,7 @@ def read_gbq_table(
index_col: Iterable[str] | str = (),
columns: Iterable[str] = (),
max_results: Optional[int] = None,
filters: third_party_pandas_gbq.FiltersType = (),
use_cache: bool = True,
col_order: Iterable[str] = (),
) -> dataframe.DataFrame:
Expand All @@ -546,6 +557,19 @@ def read_gbq_table(
elif col_order:
columns = col_order

filters = list(filters)
if len(filters) != 0 or _is_table_with_wildcard_suffix(query):
query = self._to_query(query, columns, filters)

return self._read_gbq_query(
query,
index_col=index_col,
columns=columns,
max_results=max_results,
api_name="read_gbq_table",
use_cache=use_cache,
)

return self._read_gbq_table(
query=query,
index_col=index_col,
Expand Down
26 changes: 26 additions & 0 deletions tests/system/small/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,32 @@ def test_read_gbq_twice_with_same_timestamp(session, penguins_table_id):
assert df3 is not None


def test_read_gbq_wildcard(session: bigframes.Session):
df = session.read_gbq("bigquery-public-data.noaa_gsod.gsod193*")
assert df.shape == (348485, 32)


def test_read_gbq_wildcard_with_filter(session: bigframes.Session):
df = session.read_gbq(
"bigquery-public-data.noaa_gsod.gsod19*",
filters=[("_table_suffix", ">=", "30"), ("_table_suffix", "<=", "39")], # type: ignore
)
assert df.shape == (348485, 32)


def test_read_gbq_table_wildcard(session: bigframes.Session):
df = session.read_gbq_table("bigquery-public-data.noaa_gsod.gsod193*")
assert df.shape == (348485, 32)


def test_read_gbq_table_wildcard_with_filter(session: bigframes.Session):
df = session.read_gbq_table(
"bigquery-public-data.noaa_gsod.gsod19*",
filters=[("_table_suffix", ">=", "30"), ("_table_suffix", "<=", "39")], # type: ignore
)
assert df.shape == (348485, 32)


def test_read_gbq_model(session, penguins_linear_model_name):
model = session.read_gbq_model(penguins_linear_model_name)
assert isinstance(model, bigframes.ml.linear_model.LinearRegression)
Expand Down
31 changes: 28 additions & 3 deletions tests/unit/session/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ def test_session_init_fails_with_no_project():
"test_table",
[],
[("date_col", ">", "2022-10-20")],
"SELECT * FROM test_table AS sub WHERE `date_col` > '2022-10-20'",
"SELECT * FROM `test_table` AS sub WHERE `date_col` > '2022-10-20'",
id="table_input",
),
pytest.param(
Expand All @@ -136,7 +136,7 @@ def test_session_init_fails_with_no_project():
(("string_col", "in", ["Hello, World!", "こんにちは"]),),
],
(
"SELECT `row_index`, `string_col` FROM test_table AS sub WHERE "
"SELECT `row_index`, `string_col` FROM `test_table` AS sub WHERE "
"`rowindex` NOT IN (0, 6) OR `string_col` IN ('Hello, World!', "
"'こんにちは')"
),
Expand All @@ -156,5 +156,30 @@ def test_session_init_fails_with_no_project():
)
def test_read_gbq_with_filters(query_or_table, columns, filters, expected_output):
session = resources.create_bigquery_session()
query = session._filters_to_query(query_or_table, columns, filters)
query = session._to_query(query_or_table, columns, filters)
assert query == expected_output


@pytest.mark.parametrize(
("query_or_table", "columns", "filters", "expected_output"),
[
pytest.param(
"test_table*",
[],
[],
"SELECT * FROM `test_table*` AS sub",
id="wildcard_table_input",
),
pytest.param(
"test_table*",
[],
[("_TABLE_SUFFIX", ">", "2022-10-20")],
"SELECT * FROM `test_table*` AS sub WHERE `_TABLE_SUFFIX` > '2022-10-20'",
id="wildcard_table_input_with_filter",
),
],
)
def test_read_gbq_wildcard(query_or_table, columns, filters, expected_output):
session = resources.create_bigquery_session()
query = session._to_query(query_or_table, columns, filters)
assert query == expected_output
15 changes: 12 additions & 3 deletions third_party/bigframes_vendored/pandas/io/gbq.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@

from bigframes import constants

FilterType = Tuple[str, Literal["in", "not in", "<", "<=", "==", "!=", ">=", ">"], Any]
FiltersType = Iterable[Union[FilterType, Iterable[FilterType]]]
FilterOps = Literal["in", "not in", "<", "<=", "==", "!=", ">=", ">"]
FilterType = Tuple[str, FilterOps, Any]
FiltersType = Union[Iterable[FilterType], Iterable[Iterable[FilterType]]]


class GBQIOMixin:
Expand Down Expand Up @@ -52,6 +53,9 @@ def read_gbq(
>>> df = bpd.read_gbq("bigquery-public-data.ml_datasets.penguins")
Read table path with wildcard suffix and filters:
>>> df = bpd.read_gbq_table("bigquery-public-data.noaa_gsod.gsod19*", filters=[("_table_suffix", ">=", "30"), ("_table_suffix", "<=", "39")])
Preserve ordering in a query input.
>>> df = bpd.read_gbq('''
Expand Down Expand Up @@ -96,6 +100,8 @@ def read_gbq(
A SQL string to be executed or a BigQuery table to be read. The
table must be specified in the format of
`project.dataset.tablename` or `dataset.tablename`.
Can also take wildcard table name, such as `project.dataset.table_prefix*`.
In tha case, will read all the matched table as one DataFrame.
index_col (Iterable[str] or str):
Name of result column(s) to use for index in results DataFrame.
columns (Iterable[str]):
Expand All @@ -104,14 +110,17 @@ def read_gbq(
max_results (Optional[int], default None):
If set, limit the maximum number of rows to fetch from the
query results.
filters (Iterable[Union[Tuple, Iterable[Tuple]]], default ()): To
filters (Union[Iterable[FilterType], Iterable[Iterable[FilterType]]], default ()): To
filter out data. Filter syntax: [[(column, op, val), …],…] where
op is [==, >, >=, <, <=, !=, in, not in]. The innermost tuples
are transposed into a set of filters applied through an AND
operation. The outer Iterable combines these sets of filters
through an OR operation. A single Iterable of tuples can also
be used, meaning that no OR operation between set of filters
is to be conducted.
If using wildcard table suffix in query_or_table, can specify
'_table_suffix' pseudo column to filter the tables to be read
into the DataFrame.
use_cache (bool, default True):
Whether to cache the query inputs. Default to True.
col_order (Iterable[str]):
Expand Down

0 comments on commit 90caf86

Please sign in to comment.