From 90caf865efc940f94e16643bda7ba261c2f2e473 Mon Sep 17 00:00:00 2001 From: Garrett Wu <6505921+GarrettWu@users.noreply.github.com> Date: Mon, 12 Feb 2024 17:40:20 -0800 Subject: [PATCH] feat: support read_gbq wildcard table path (#377) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly: - [ ] Make sure to open an issue as a [bug/issue](https://togithub.com/googleapis/python-bigquery-dataframes/issues/new/choose) before writing your code! That way we can discuss the change, evaluate designs, and agree on the general idea - [ ] Ensure the tests and linter pass - [ ] Code coverage does not decrease (if any source code was changed) - [ ] Appropriate docs were updated (if necessary) Fixes # 🦕 --- bigframes/pandas/__init__.py | 2 + bigframes/session/__init__.py | 64 +++++++++++++------ tests/system/small/test_session.py | 26 ++++++++ tests/unit/session/test_session.py | 31 ++++++++- .../bigframes_vendored/pandas/io/gbq.py | 15 ++++- 5 files changed, 112 insertions(+), 26 deletions(-) diff --git a/bigframes/pandas/__init__.py b/bigframes/pandas/__init__.py index 5320e84e21..110978a7f1 100644 --- a/bigframes/pandas/__init__.py +++ b/bigframes/pandas/__init__.py @@ -551,6 +551,7 @@ def read_gbq_table( index_col: Iterable[str] | str = (), columns: Iterable[str] = (), max_results: Optional[int] = None, + filters: vendored_pandas_gbq.FiltersType = (), use_cache: bool = True, col_order: Iterable[str] = (), ) -> bigframes.dataframe.DataFrame: @@ -561,6 +562,7 @@ def read_gbq_table( index_col=index_col, columns=columns, max_results=max_results, + filters=filters, use_cache=use_cache, col_order=col_order, ) diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index 15d4b3577b..df0cd6e947 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -30,6 +30,7 @@ Iterable, List, Literal, + Mapping, MutableSequence, Optional, Sequence, @@ -115,6 +116,11 @@ def _is_query(query_or_table: str) -> bool: return re.search(r"\s", query_or_table.strip(), re.MULTILINE) is not None +def _is_table_with_wildcard_suffix(query_or_table: str) -> bool: + """Determine if `query_or_table` is a table and contains a wildcard suffix.""" + return not _is_query(query_or_table) and query_or_table.endswith("*") + + class Session( third_party_pandas_gbq.GBQIOMixin, third_party_pandas_parquet.ParquetIOMixin, @@ -248,7 +254,9 @@ def read_gbq( elif col_order: columns = col_order - query_or_table = self._filters_to_query(query_or_table, columns, filters) + filters = list(filters) + if len(filters) != 0 or _is_table_with_wildcard_suffix(query_or_table): + query_or_table = self._to_query(query_or_table, columns, filters) if _is_query(query_or_table): return self._read_gbq_query( @@ -272,13 +280,18 @@ def read_gbq( use_cache=use_cache, ) - def _filters_to_query(self, query_or_table, columns, filters): - """Convert filters to query""" - if len(filters) == 0: - return query_or_table - + def _to_query( + self, + query_or_table: str, + columns: Iterable[str], + filters: third_party_pandas_gbq.FiltersType, + ) -> str: + """Compile query_or_table with conditions(filters, wildcards) to query.""" + filters = list(filters) sub_query = ( - f"({query_or_table})" if _is_query(query_or_table) else query_or_table + f"({query_or_table})" + if _is_query(query_or_table) + else f"`{query_or_table}`" ) select_clause = "SELECT " + ( @@ -287,7 +300,7 @@ def _filters_to_query(self, query_or_table, columns, filters): where_clause = "" if filters: - valid_operators = { + valid_operators: Mapping[third_party_pandas_gbq.FilterOps, str] = { "in": "IN", "not in": "NOT IN", "==": "=", @@ -298,19 +311,16 @@ def _filters_to_query(self, query_or_table, columns, filters): "!=": "!=", } - if ( - isinstance(filters, Iterable) - and isinstance(filters[0], Tuple) - and (len(filters[0]) == 0 or not isinstance(filters[0][0], Tuple)) + # If single layer filter, add another pseudo layer. So the single layer represents "and" logic. + if isinstance(filters[0], tuple) and ( + len(filters[0]) == 0 or not isinstance(list(filters[0])[0], tuple) ): - filters = [filters] + filters = typing.cast(third_party_pandas_gbq.FiltersType, [filters]) or_expressions = [] for group in filters: if not isinstance(group, Iterable): - raise ValueError( - f"Filter group should be a iterable, {group} is not valid." - ) + group = [group] and_expressions = [] for filter_item in group: @@ -329,13 +339,13 @@ def _filters_to_query(self, query_or_table, columns, filters): if operator not in valid_operators: raise ValueError(f"Operator {operator} is not valid.") - operator = valid_operators[operator] + operator_str = valid_operators[operator] - if operator in ["IN", "NOT IN"]: + if operator_str in ["IN", "NOT IN"]: value_list = ", ".join([repr(v) for v in value]) - expression = f"`{column}` {operator} ({value_list})" + expression = f"`{column}` {operator_str} ({value_list})" else: - expression = f"`{column}` {operator} {repr(value)}" + expression = f"`{column}` {operator_str} {repr(value)}" and_expressions.append(expression) or_expressions.append(" AND ".join(and_expressions)) @@ -521,6 +531,7 @@ def read_gbq_table( index_col: Iterable[str] | str = (), columns: Iterable[str] = (), max_results: Optional[int] = None, + filters: third_party_pandas_gbq.FiltersType = (), use_cache: bool = True, col_order: Iterable[str] = (), ) -> dataframe.DataFrame: @@ -546,6 +557,19 @@ def read_gbq_table( elif col_order: columns = col_order + filters = list(filters) + if len(filters) != 0 or _is_table_with_wildcard_suffix(query): + query = self._to_query(query, columns, filters) + + return self._read_gbq_query( + query, + index_col=index_col, + columns=columns, + max_results=max_results, + api_name="read_gbq_table", + use_cache=use_cache, + ) + return self._read_gbq_table( query=query, index_col=index_col, diff --git a/tests/system/small/test_session.py b/tests/system/small/test_session.py index 2d9c332de1..85573472b9 100644 --- a/tests/system/small/test_session.py +++ b/tests/system/small/test_session.py @@ -327,6 +327,32 @@ def test_read_gbq_twice_with_same_timestamp(session, penguins_table_id): assert df3 is not None +def test_read_gbq_wildcard(session: bigframes.Session): + df = session.read_gbq("bigquery-public-data.noaa_gsod.gsod193*") + assert df.shape == (348485, 32) + + +def test_read_gbq_wildcard_with_filter(session: bigframes.Session): + df = session.read_gbq( + "bigquery-public-data.noaa_gsod.gsod19*", + filters=[("_table_suffix", ">=", "30"), ("_table_suffix", "<=", "39")], # type: ignore + ) + assert df.shape == (348485, 32) + + +def test_read_gbq_table_wildcard(session: bigframes.Session): + df = session.read_gbq_table("bigquery-public-data.noaa_gsod.gsod193*") + assert df.shape == (348485, 32) + + +def test_read_gbq_table_wildcard_with_filter(session: bigframes.Session): + df = session.read_gbq_table( + "bigquery-public-data.noaa_gsod.gsod19*", + filters=[("_table_suffix", ">=", "30"), ("_table_suffix", "<=", "39")], # type: ignore + ) + assert df.shape == (348485, 32) + + def test_read_gbq_model(session, penguins_linear_model_name): model = session.read_gbq_model(penguins_linear_model_name) assert isinstance(model, bigframes.ml.linear_model.LinearRegression) diff --git a/tests/unit/session/test_session.py b/tests/unit/session/test_session.py index ea8d0882ae..b474c9f63e 100644 --- a/tests/unit/session/test_session.py +++ b/tests/unit/session/test_session.py @@ -125,7 +125,7 @@ def test_session_init_fails_with_no_project(): "test_table", [], [("date_col", ">", "2022-10-20")], - "SELECT * FROM test_table AS sub WHERE `date_col` > '2022-10-20'", + "SELECT * FROM `test_table` AS sub WHERE `date_col` > '2022-10-20'", id="table_input", ), pytest.param( @@ -136,7 +136,7 @@ def test_session_init_fails_with_no_project(): (("string_col", "in", ["Hello, World!", "こんにちは"]),), ], ( - "SELECT `row_index`, `string_col` FROM test_table AS sub WHERE " + "SELECT `row_index`, `string_col` FROM `test_table` AS sub WHERE " "`rowindex` NOT IN (0, 6) OR `string_col` IN ('Hello, World!', " "'こんにちは')" ), @@ -156,5 +156,30 @@ def test_session_init_fails_with_no_project(): ) def test_read_gbq_with_filters(query_or_table, columns, filters, expected_output): session = resources.create_bigquery_session() - query = session._filters_to_query(query_or_table, columns, filters) + query = session._to_query(query_or_table, columns, filters) + assert query == expected_output + + +@pytest.mark.parametrize( + ("query_or_table", "columns", "filters", "expected_output"), + [ + pytest.param( + "test_table*", + [], + [], + "SELECT * FROM `test_table*` AS sub", + id="wildcard_table_input", + ), + pytest.param( + "test_table*", + [], + [("_TABLE_SUFFIX", ">", "2022-10-20")], + "SELECT * FROM `test_table*` AS sub WHERE `_TABLE_SUFFIX` > '2022-10-20'", + id="wildcard_table_input_with_filter", + ), + ], +) +def test_read_gbq_wildcard(query_or_table, columns, filters, expected_output): + session = resources.create_bigquery_session() + query = session._to_query(query_or_table, columns, filters) assert query == expected_output diff --git a/third_party/bigframes_vendored/pandas/io/gbq.py b/third_party/bigframes_vendored/pandas/io/gbq.py index 8e2c9f092d..1f31c530d2 100644 --- a/third_party/bigframes_vendored/pandas/io/gbq.py +++ b/third_party/bigframes_vendored/pandas/io/gbq.py @@ -7,8 +7,9 @@ from bigframes import constants -FilterType = Tuple[str, Literal["in", "not in", "<", "<=", "==", "!=", ">=", ">"], Any] -FiltersType = Iterable[Union[FilterType, Iterable[FilterType]]] +FilterOps = Literal["in", "not in", "<", "<=", "==", "!=", ">=", ">"] +FilterType = Tuple[str, FilterOps, Any] +FiltersType = Union[Iterable[FilterType], Iterable[Iterable[FilterType]]] class GBQIOMixin: @@ -52,6 +53,9 @@ def read_gbq( >>> df = bpd.read_gbq("bigquery-public-data.ml_datasets.penguins") + Read table path with wildcard suffix and filters: + >>> df = bpd.read_gbq_table("bigquery-public-data.noaa_gsod.gsod19*", filters=[("_table_suffix", ">=", "30"), ("_table_suffix", "<=", "39")]) + Preserve ordering in a query input. >>> df = bpd.read_gbq(''' @@ -96,6 +100,8 @@ def read_gbq( A SQL string to be executed or a BigQuery table to be read. The table must be specified in the format of `project.dataset.tablename` or `dataset.tablename`. + Can also take wildcard table name, such as `project.dataset.table_prefix*`. + In tha case, will read all the matched table as one DataFrame. index_col (Iterable[str] or str): Name of result column(s) to use for index in results DataFrame. columns (Iterable[str]): @@ -104,7 +110,7 @@ def read_gbq( max_results (Optional[int], default None): If set, limit the maximum number of rows to fetch from the query results. - filters (Iterable[Union[Tuple, Iterable[Tuple]]], default ()): To + filters (Union[Iterable[FilterType], Iterable[Iterable[FilterType]]], default ()): To filter out data. Filter syntax: [[(column, op, val), …],…] where op is [==, >, >=, <, <=, !=, in, not in]. The innermost tuples are transposed into a set of filters applied through an AND @@ -112,6 +118,9 @@ def read_gbq( through an OR operation. A single Iterable of tuples can also be used, meaning that no OR operation between set of filters is to be conducted. + If using wildcard table suffix in query_or_table, can specify + '_table_suffix' pseudo column to filter the tables to be read + into the DataFrame. use_cache (bool, default True): Whether to cache the query inputs. Default to True. col_order (Iterable[str]):