Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support read_gbq wildcard table path #377

Merged
merged 7 commits into from
Feb 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions bigframes/pandas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -551,6 +551,7 @@ def read_gbq_table(
index_col: Iterable[str] | str = (),
columns: Iterable[str] = (),
max_results: Optional[int] = None,
filters: vendored_pandas_gbq.FiltersType = (),
use_cache: bool = True,
col_order: Iterable[str] = (),
) -> bigframes.dataframe.DataFrame:
Expand All @@ -561,6 +562,7 @@ def read_gbq_table(
index_col=index_col,
columns=columns,
max_results=max_results,
filters=filters,
use_cache=use_cache,
col_order=col_order,
)
Expand Down
64 changes: 44 additions & 20 deletions bigframes/session/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
Iterable,
List,
Literal,
Mapping,
MutableSequence,
Optional,
Sequence,
Expand Down Expand Up @@ -115,6 +116,11 @@ def _is_query(query_or_table: str) -> bool:
return re.search(r"\s", query_or_table.strip(), re.MULTILINE) is not None


def _is_table_with_wildcard_suffix(query_or_table: str) -> bool:
"""Determine if `query_or_table` is a table and contains a wildcard suffix."""
return not _is_query(query_or_table) and query_or_table.endswith("*")


class Session(
third_party_pandas_gbq.GBQIOMixin,
third_party_pandas_parquet.ParquetIOMixin,
Expand Down Expand Up @@ -248,7 +254,9 @@ def read_gbq(
elif col_order:
columns = col_order

query_or_table = self._filters_to_query(query_or_table, columns, filters)
filters = list(filters)
if len(filters) != 0 or _is_table_with_wildcard_suffix(query_or_table):
query_or_table = self._to_query(query_or_table, columns, filters)

if _is_query(query_or_table):
return self._read_gbq_query(
Expand All @@ -272,13 +280,18 @@ def read_gbq(
use_cache=use_cache,
)

def _filters_to_query(self, query_or_table, columns, filters):
"""Convert filters to query"""
if len(filters) == 0:
return query_or_table

def _to_query(
self,
query_or_table: str,
columns: Iterable[str],
filters: third_party_pandas_gbq.FiltersType,
) -> str:
"""Compile query_or_table with conditions(filters, wildcards) to query."""
filters = list(filters)
sub_query = (
f"({query_or_table})" if _is_query(query_or_table) else query_or_table
f"({query_or_table})"
if _is_query(query_or_table)
else f"`{query_or_table}`"
)

select_clause = "SELECT " + (
Expand All @@ -287,7 +300,7 @@ def _filters_to_query(self, query_or_table, columns, filters):

where_clause = ""
if filters:
valid_operators = {
valid_operators: Mapping[third_party_pandas_gbq.FilterOps, str] = {
"in": "IN",
"not in": "NOT IN",
"==": "=",
Expand All @@ -298,19 +311,16 @@ def _filters_to_query(self, query_or_table, columns, filters):
"!=": "!=",
}

if (
isinstance(filters, Iterable)
and isinstance(filters[0], Tuple)
and (len(filters[0]) == 0 or not isinstance(filters[0][0], Tuple))
# If single layer filter, add another pseudo layer. So the single layer represents "and" logic.
if isinstance(filters[0], tuple) and (
len(filters[0]) == 0 or not isinstance(list(filters[0])[0], tuple)
):
filters = [filters]
filters = typing.cast(third_party_pandas_gbq.FiltersType, [filters])

or_expressions = []
for group in filters:
if not isinstance(group, Iterable):
raise ValueError(
f"Filter group should be a iterable, {group} is not valid."
)
group = [group]

and_expressions = []
for filter_item in group:
Expand All @@ -329,13 +339,13 @@ def _filters_to_query(self, query_or_table, columns, filters):
if operator not in valid_operators:
raise ValueError(f"Operator {operator} is not valid.")

operator = valid_operators[operator]
operator_str = valid_operators[operator]

if operator in ["IN", "NOT IN"]:
if operator_str in ["IN", "NOT IN"]:
value_list = ", ".join([repr(v) for v in value])
expression = f"`{column}` {operator} ({value_list})"
expression = f"`{column}` {operator_str} ({value_list})"
else:
expression = f"`{column}` {operator} {repr(value)}"
expression = f"`{column}` {operator_str} {repr(value)}"
and_expressions.append(expression)

or_expressions.append(" AND ".join(and_expressions))
Expand Down Expand Up @@ -521,6 +531,7 @@ def read_gbq_table(
index_col: Iterable[str] | str = (),
columns: Iterable[str] = (),
max_results: Optional[int] = None,
filters: third_party_pandas_gbq.FiltersType = (),
use_cache: bool = True,
col_order: Iterable[str] = (),
) -> dataframe.DataFrame:
Expand All @@ -546,6 +557,19 @@ def read_gbq_table(
elif col_order:
columns = col_order

filters = list(filters)
if len(filters) != 0 or _is_table_with_wildcard_suffix(query):
query = self._to_query(query, columns, filters)

return self._read_gbq_query(
query,
index_col=index_col,
columns=columns,
max_results=max_results,
api_name="read_gbq_table",
use_cache=use_cache,
)

return self._read_gbq_table(
query=query,
index_col=index_col,
Expand Down
26 changes: 26 additions & 0 deletions tests/system/small/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,32 @@ def test_read_gbq_twice_with_same_timestamp(session, penguins_table_id):
assert df3 is not None


def test_read_gbq_wildcard(session: bigframes.Session):
df = session.read_gbq("bigquery-public-data.noaa_gsod.gsod193*")
assert df.shape == (348485, 32)


def test_read_gbq_wildcard_with_filter(session: bigframes.Session):
df = session.read_gbq(
"bigquery-public-data.noaa_gsod.gsod19*",
filters=[("_table_suffix", ">=", "30"), ("_table_suffix", "<=", "39")], # type: ignore
)
assert df.shape == (348485, 32)


def test_read_gbq_table_wildcard(session: bigframes.Session):
df = session.read_gbq_table("bigquery-public-data.noaa_gsod.gsod193*")
assert df.shape == (348485, 32)


def test_read_gbq_table_wildcard_with_filter(session: bigframes.Session):
df = session.read_gbq_table(
"bigquery-public-data.noaa_gsod.gsod19*",
filters=[("_table_suffix", ">=", "30"), ("_table_suffix", "<=", "39")], # type: ignore
)
assert df.shape == (348485, 32)


def test_read_gbq_model(session, penguins_linear_model_name):
model = session.read_gbq_model(penguins_linear_model_name)
assert isinstance(model, bigframes.ml.linear_model.LinearRegression)
Expand Down
31 changes: 28 additions & 3 deletions tests/unit/session/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ def test_session_init_fails_with_no_project():
"test_table",
[],
[("date_col", ">", "2022-10-20")],
"SELECT * FROM test_table AS sub WHERE `date_col` > '2022-10-20'",
"SELECT * FROM `test_table` AS sub WHERE `date_col` > '2022-10-20'",
id="table_input",
),
pytest.param(
Expand All @@ -136,7 +136,7 @@ def test_session_init_fails_with_no_project():
(("string_col", "in", ["Hello, World!", "こんにちは"]),),
],
(
"SELECT `row_index`, `string_col` FROM test_table AS sub WHERE "
"SELECT `row_index`, `string_col` FROM `test_table` AS sub WHERE "
"`rowindex` NOT IN (0, 6) OR `string_col` IN ('Hello, World!', "
"'こんにちは')"
),
Expand All @@ -156,5 +156,30 @@ def test_session_init_fails_with_no_project():
)
def test_read_gbq_with_filters(query_or_table, columns, filters, expected_output):
session = resources.create_bigquery_session()
query = session._filters_to_query(query_or_table, columns, filters)
query = session._to_query(query_or_table, columns, filters)
assert query == expected_output


@pytest.mark.parametrize(
("query_or_table", "columns", "filters", "expected_output"),
[
pytest.param(
"test_table*",
[],
[],
"SELECT * FROM `test_table*` AS sub",
id="wildcard_table_input",
),
pytest.param(
"test_table*",
[],
[("_TABLE_SUFFIX", ">", "2022-10-20")],
"SELECT * FROM `test_table*` AS sub WHERE `_TABLE_SUFFIX` > '2022-10-20'",
id="wildcard_table_input_with_filter",
),
],
)
def test_read_gbq_wildcard(query_or_table, columns, filters, expected_output):
session = resources.create_bigquery_session()
query = session._to_query(query_or_table, columns, filters)
assert query == expected_output
15 changes: 12 additions & 3 deletions third_party/bigframes_vendored/pandas/io/gbq.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@

from bigframes import constants

FilterType = Tuple[str, Literal["in", "not in", "<", "<=", "==", "!=", ">=", ">"], Any]
FiltersType = Iterable[Union[FilterType, Iterable[FilterType]]]
FilterOps = Literal["in", "not in", "<", "<=", "==", "!=", ">=", ">"]
FilterType = Tuple[str, FilterOps, Any]
FiltersType = Union[Iterable[FilterType], Iterable[Iterable[FilterType]]]


class GBQIOMixin:
Expand Down Expand Up @@ -52,6 +53,9 @@ def read_gbq(

>>> df = bpd.read_gbq("bigquery-public-data.ml_datasets.penguins")

Read table path with wildcard suffix and filters:
>>> df = bpd.read_gbq_table("bigquery-public-data.noaa_gsod.gsod19*", filters=[("_table_suffix", ">=", "30"), ("_table_suffix", "<=", "39")])

Preserve ordering in a query input.

>>> df = bpd.read_gbq('''
Expand Down Expand Up @@ -96,6 +100,8 @@ def read_gbq(
A SQL string to be executed or a BigQuery table to be read. The
table must be specified in the format of
`project.dataset.tablename` or `dataset.tablename`.
Can also take wildcard table name, such as `project.dataset.table_prefix*`.
In tha case, will read all the matched table as one DataFrame.
index_col (Iterable[str] or str):
Name of result column(s) to use for index in results DataFrame.
columns (Iterable[str]):
Expand All @@ -104,14 +110,17 @@ def read_gbq(
max_results (Optional[int], default None):
If set, limit the maximum number of rows to fetch from the
query results.
filters (Iterable[Union[Tuple, Iterable[Tuple]]], default ()): To
filters (Union[Iterable[FilterType], Iterable[Iterable[FilterType]]], default ()): To
filter out data. Filter syntax: [[(column, op, val), …],…] where
op is [==, >, >=, <, <=, !=, in, not in]. The innermost tuples
are transposed into a set of filters applied through an AND
operation. The outer Iterable combines these sets of filters
through an OR operation. A single Iterable of tuples can also
be used, meaning that no OR operation between set of filters
is to be conducted.
If using wildcard table suffix in query_or_table, can specify
'_table_suffix' pseudo column to filter the tables to be read
into the DataFrame.
use_cache (bool, default True):
Whether to cache the query inputs. Default to True.
col_order (Iterable[str]):
Expand Down