From e4efcf758736d815adab211907369c3f0167c2c4 Mon Sep 17 00:00:00 2001 From: Ashley Xu Date: Tue, 21 Nov 2023 18:06:23 +0000 Subject: [PATCH 01/11] feat: add the recent api method for ML component --- bigframes/ml/cluster.py | 2 ++ bigframes/ml/compose.py | 2 ++ bigframes/ml/decomposition.py | 2 ++ bigframes/ml/ensemble.py | 5 +++++ bigframes/ml/forecasting.py | 2 ++ bigframes/ml/imported.py | 3 +++ bigframes/ml/linear_model.py | 3 +++ bigframes/ml/llm.py | 4 +++- bigframes/ml/pipeline.py | 2 ++ bigframes/ml/preprocessing.py | 7 +++++++ 10 files changed, 31 insertions(+), 1 deletion(-) diff --git a/bigframes/ml/cluster.py b/bigframes/ml/cluster.py index c9f52ba0b6..6b79d356a2 100644 --- a/bigframes/ml/cluster.py +++ b/bigframes/ml/cluster.py @@ -22,11 +22,13 @@ from google.cloud import bigquery import bigframes +from bigframes.core import log_adapter from bigframes.ml import base, core, globals, utils import bigframes.pandas as bpd import third_party.bigframes_vendored.sklearn.cluster._kmeans +@log_adapter.class_logger class KMeans( base.UnsupervisedTrainablePredictor, third_party.bigframes_vendored.sklearn.cluster._kmeans.KMeans, diff --git a/bigframes/ml/compose.py b/bigframes/ml/compose.py index bf046ff691..ace876dd2d 100644 --- a/bigframes/ml/compose.py +++ b/bigframes/ml/compose.py @@ -22,6 +22,7 @@ from typing import List, Optional, Tuple, Union from bigframes import constants +from bigframes.core import log_adapter from bigframes.ml import base, core, globals, preprocessing, utils import bigframes.pandas as bpd import third_party.bigframes_vendored.sklearn.compose._column_transformer @@ -36,6 +37,7 @@ ] +@log_adapter.class_logger class ColumnTransformer( base.Transformer, third_party.bigframes_vendored.sklearn.compose._column_transformer.ColumnTransformer, diff --git a/bigframes/ml/decomposition.py b/bigframes/ml/decomposition.py index 7cda7a6993..ef777cb33a 100644 --- a/bigframes/ml/decomposition.py +++ b/bigframes/ml/decomposition.py @@ -22,11 +22,13 @@ from google.cloud import bigquery import bigframes +from bigframes.core import log_adapter from bigframes.ml import base, core, globals, utils import bigframes.pandas as bpd import third_party.bigframes_vendored.sklearn.decomposition._pca +@log_adapter.class_logger class PCA( base.UnsupervisedTrainablePredictor, third_party.bigframes_vendored.sklearn.decomposition._pca.PCA, diff --git a/bigframes/ml/ensemble.py b/bigframes/ml/ensemble.py index fcb3fe5343..1cc9fb3739 100644 --- a/bigframes/ml/ensemble.py +++ b/bigframes/ml/ensemble.py @@ -22,6 +22,7 @@ from google.cloud import bigquery import bigframes +from bigframes.core import log_adapter from bigframes.ml import base, core, globals, utils import bigframes.pandas as bpd import third_party.bigframes_vendored.sklearn.ensemble._forest @@ -47,6 +48,7 @@ } +@log_adapter.class_logger class XGBRegressor( base.SupervisedTrainablePredictor, third_party.bigframes_vendored.xgboost.sklearn.XGBRegressor, @@ -202,6 +204,7 @@ def to_gbq(self, model_name: str, replace: bool = False) -> XGBRegressor: return new_model.session.read_gbq_model(model_name) +@log_adapter.class_logger class XGBClassifier( base.SupervisedTrainablePredictor, third_party.bigframes_vendored.xgboost.sklearn.XGBClassifier, @@ -356,6 +359,7 @@ def to_gbq(self, model_name: str, replace: bool = False) -> XGBClassifier: return new_model.session.read_gbq_model(model_name) +@log_adapter.class_logger class RandomForestRegressor( base.SupervisedTrainablePredictor, third_party.bigframes_vendored.sklearn.ensemble._forest.RandomForestRegressor, @@ -521,6 +525,7 @@ def to_gbq(self, model_name: str, replace: bool = False) -> RandomForestRegresso return new_model.session.read_gbq_model(model_name) +@log_adapter.class_logger class RandomForestClassifier( base.SupervisedTrainablePredictor, third_party.bigframes_vendored.sklearn.ensemble._forest.RandomForestClassifier, diff --git a/bigframes/ml/forecasting.py b/bigframes/ml/forecasting.py index cf23854fa0..995201062b 100644 --- a/bigframes/ml/forecasting.py +++ b/bigframes/ml/forecasting.py @@ -21,10 +21,12 @@ from google.cloud import bigquery import bigframes +from bigframes.core import log_adapter from bigframes.ml import base, core, globals, utils import bigframes.pandas as bpd +@log_adapter.class_logger class ARIMAPlus(base.SupervisedTrainablePredictor): """Time Series ARIMA Plus model.""" diff --git a/bigframes/ml/imported.py b/bigframes/ml/imported.py index f6afc9aa38..4ae0a8ea4d 100644 --- a/bigframes/ml/imported.py +++ b/bigframes/ml/imported.py @@ -21,10 +21,12 @@ from google.cloud import bigquery import bigframes +from bigframes.core import log_adapter from bigframes.ml import base, core, globals, utils import bigframes.pandas as bpd +@log_adapter.class_logger class TensorFlowModel(base.Predictor): """Imported TensorFlow model. @@ -101,6 +103,7 @@ def to_gbq(self, model_name: str, replace: bool = False) -> TensorFlowModel: return new_model.session.read_gbq_model(model_name) +@log_adapter.class_logger class ONNXModel(base.Predictor): """Imported Open Neural Network Exchange (ONNX) model. diff --git a/bigframes/ml/linear_model.py b/bigframes/ml/linear_model.py index 433d9fbc38..5ee87b8850 100644 --- a/bigframes/ml/linear_model.py +++ b/bigframes/ml/linear_model.py @@ -23,6 +23,7 @@ import bigframes import bigframes.constants as constants +from bigframes.core import log_adapter from bigframes.ml import base, core, globals, utils import bigframes.pandas as bpd import third_party.bigframes_vendored.sklearn.linear_model._base @@ -46,6 +47,7 @@ } +@log_adapter.class_logger class LinearRegression( base.SupervisedTrainablePredictor, third_party.bigframes_vendored.sklearn.linear_model._base.LinearRegression, @@ -178,6 +180,7 @@ def to_gbq(self, model_name: str, replace: bool = False) -> LinearRegression: return new_model.session.read_gbq_model(model_name) +@log_adapter.class_logger class LogisticRegression( base.SupervisedTrainablePredictor, third_party.bigframes_vendored.sklearn.linear_model._logistic.LogisticRegression, diff --git a/bigframes/ml/llm.py b/bigframes/ml/llm.py index 78f3369daf..5beb54a32d 100644 --- a/bigframes/ml/llm.py +++ b/bigframes/ml/llm.py @@ -21,7 +21,7 @@ import bigframes from bigframes import clients, constants -from bigframes.core import blocks +from bigframes.core import blocks, log_adapter from bigframes.ml import base, core, globals, utils import bigframes.pandas as bpd @@ -43,6 +43,7 @@ _ML_EMBED_TEXT_STATUS = "ml_embed_text_status" +@log_adapter.class_logger class PaLM2TextGenerator(base.Predictor): """PaLM2 text generator LLM model. @@ -200,6 +201,7 @@ def predict( return df +@log_adapter.class_logger class PaLM2TextEmbeddingGenerator(base.Predictor): """PaLM2 text embedding generator LLM model. diff --git a/bigframes/ml/pipeline.py b/bigframes/ml/pipeline.py index ad0b3fae11..4ae2bfe555 100644 --- a/bigframes/ml/pipeline.py +++ b/bigframes/ml/pipeline.py @@ -24,11 +24,13 @@ import bigframes import bigframes.constants as constants +from bigframes.core import log_adapter from bigframes.ml import base, compose, forecasting, loader, preprocessing, utils import bigframes.pandas as bpd import third_party.bigframes_vendored.sklearn.pipeline +@log_adapter.class_logger class Pipeline( base.BaseEstimator, third_party.bigframes_vendored.sklearn.pipeline.Pipeline, diff --git a/bigframes/ml/preprocessing.py b/bigframes/ml/preprocessing.py index 5f44d40218..a403e57e71 100644 --- a/bigframes/ml/preprocessing.py +++ b/bigframes/ml/preprocessing.py @@ -20,6 +20,7 @@ import typing from typing import Any, cast, List, Literal, Optional, Tuple, Union +from bigframes.core import log_adapter from bigframes.ml import base, core, globals, utils import bigframes.pandas as bpd import third_party.bigframes_vendored.sklearn.preprocessing._data @@ -28,6 +29,7 @@ import third_party.bigframes_vendored.sklearn.preprocessing._label +@log_adapter.class_logger class StandardScaler( base.Transformer, third_party.bigframes_vendored.sklearn.preprocessing._data.StandardScaler, @@ -111,6 +113,7 @@ def transform(self, X: Union[bpd.DataFrame, bpd.Series]) -> bpd.DataFrame: ) +@log_adapter.class_logger class MaxAbsScaler( base.Transformer, third_party.bigframes_vendored.sklearn.preprocessing._data.MaxAbsScaler, @@ -194,6 +197,7 @@ def transform(self, X: Union[bpd.DataFrame, bpd.Series]) -> bpd.DataFrame: ) +@log_adapter.class_logger class MinMaxScaler( base.Transformer, third_party.bigframes_vendored.sklearn.preprocessing._data.MinMaxScaler, @@ -277,6 +281,7 @@ def transform(self, X: Union[bpd.DataFrame, bpd.Series]) -> bpd.DataFrame: ) +@log_adapter.class_logger class KBinsDiscretizer( base.Transformer, third_party.bigframes_vendored.sklearn.preprocessing._discretization.KBinsDiscretizer, @@ -395,6 +400,7 @@ def transform(self, X: Union[bpd.DataFrame, bpd.Series]) -> bpd.DataFrame: ) +@log_adapter.class_logger class OneHotEncoder( base.Transformer, third_party.bigframes_vendored.sklearn.preprocessing._encoder.OneHotEncoder, @@ -524,6 +530,7 @@ def transform(self, X: Union[bpd.DataFrame, bpd.Series]) -> bpd.DataFrame: ) +@log_adapter.class_logger class LabelEncoder( base.LabelTransformer, third_party.bigframes_vendored.sklearn.preprocessing._label.LabelEncoder, From 2d5a85b81afb82d9ec9acad41a91ae8c5bc9a5bb Mon Sep 17 00:00:00 2001 From: Ashley Xu Date: Tue, 21 Nov 2023 19:05:35 +0000 Subject: [PATCH 02/11] fix: fix unit test failure --- tests/unit/session/test_io_bigquery.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/unit/session/test_io_bigquery.py b/tests/unit/session/test_io_bigquery.py index e1481d3f05..c87835c412 100644 --- a/tests/unit/session/test_io_bigquery.py +++ b/tests/unit/session/test_io_bigquery.py @@ -59,6 +59,7 @@ def test_create_job_configs_labels_length_limit_not_met(): def test_create_job_configs_labels_log_adaptor_call_method_under_length_limit(): + log_adapter.get_and_reset_api_methods() cur_labels = { "bigframes-api": "read_pandas", "source": "bigquery-dataframes-temp", @@ -87,6 +88,7 @@ def test_create_job_configs_labels_log_adaptor_call_method_under_length_limit(): def test_create_job_configs_labels_length_limit_met_and_labels_is_none(): + log_adapter.get_and_reset_api_methods() df = bpd.DataFrame({"col1": [1, 2], "col2": [3, 4]}) # Test running methods more than the labels' length limit for i in range(66): @@ -102,6 +104,7 @@ def test_create_job_configs_labels_length_limit_met_and_labels_is_none(): def test_create_job_configs_labels_length_limit_met(): + log_adapter.get_and_reset_api_methods() cur_labels = { "bigframes-api": "read_pandas", "source": "bigquery-dataframes-temp", From 862a2afb49c239a5f48caf74e4e98b330fb6ccf8 Mon Sep 17 00:00:00 2001 From: Ashley Xu Date: Thu, 23 Nov 2023 00:10:31 +0000 Subject: [PATCH 03/11] fix: use the oldest timestamp for same table lookup --- bigframes/session/__init__.py | 33 ++++++++++---------------- bigframes/session/_io/bigquery.py | 5 ---- tests/system/small/test_session.py | 18 ++++++++++++++ tests/unit/session/test_io_bigquery.py | 14 ----------- 4 files changed, 31 insertions(+), 39 deletions(-) diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index 928123ce74..949084cdea 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -177,6 +177,7 @@ def __init__( # Now that we're starting the session, don't allow the options to be # changed. context._session_started = True + self._df_snapshot: Dict[bigquery.TableReference, datetime.datetime] = {} @property def bqclient(self): @@ -462,19 +463,6 @@ def _get_snapshot_sql_and_primary_key( column(s), then return those too so that ordering generation can be avoided. """ - if table_ref.dataset_id.upper() == "_SESSION": - # _SESSION tables aren't supported by the tables.get REST API. - return ( - self.ibis_client.sql( - f"SELECT * FROM `_SESSION`.`{table_ref.table_id}`" - ), - None, - ) - table_expression = self.ibis_client.table( - table_ref.table_id, - database=f"{table_ref.project}.{table_ref.dataset_id}", - ) - # If there are primary keys defined, the query engine assumes these # columns are unique, even if the constraint is not enforced. We make # the same assumption and use these columns as the total ordering keys. @@ -495,14 +483,19 @@ def _get_snapshot_sql_and_primary_key( job_config = bigquery.QueryJobConfig() job_config.labels["bigframes-api"] = api_name - current_timestamp = list( - self.bqclient.query( - "SELECT CURRENT_TIMESTAMP() AS `current_timestamp`", - job_config=job_config, - ).result() - )[0][0] + + if table_ref in self._df_snapshot.keys(): + snapshot_timestamp = self._df_snapshot[table_ref] + else: + snapshot_timestamp = list( + self.bqclient.query( + "SELECT CURRENT_TIMESTAMP() AS `current_timestamp`", + job_config=job_config, + ).result() + )[0][0] + self._df_snapshot[table_ref] = snapshot_timestamp table_expression = self.ibis_client.sql( - bigframes_io.create_snapshot_sql(table_ref, current_timestamp) + bigframes_io.create_snapshot_sql(table_ref, snapshot_timestamp) ) return table_expression, primary_keys diff --git a/bigframes/session/_io/bigquery.py b/bigframes/session/_io/bigquery.py index dae73301e7..4770f12089 100644 --- a/bigframes/session/_io/bigquery.py +++ b/bigframes/session/_io/bigquery.py @@ -117,11 +117,6 @@ def create_snapshot_sql( table_ref: bigquery.TableReference, current_timestamp: datetime.datetime ) -> str: """Query a table via 'time travel' for consistent reads.""" - - # If we have a _SESSION table, assume that it's already a copy. Nothing to do here. - if table_ref.dataset_id.upper() == "_SESSION": - return f"SELECT * FROM `_SESSION`.`{table_ref.table_id}`" - # If we have an anonymous query results table, it can't be modified and # there isn't any BigQuery time travel. if table_ref.dataset_id.startswith("_"): diff --git a/tests/system/small/test_session.py b/tests/system/small/test_session.py index 7cd9f1dd59..26c5093b35 100644 --- a/tests/system/small/test_session.py +++ b/tests/system/small/test_session.py @@ -16,6 +16,7 @@ import random import tempfile import textwrap +import time import typing from typing import List @@ -308,6 +309,23 @@ def test_read_gbq_w_script_no_select(session, dataset_id: str): assert df["statement_type"][0] == "SCRIPT" +def test_read_gbq_twice_with_same_timestamp(session, penguins_table_id): + df1 = session.read_gbq(penguins_table_id) + time.sleep(1) + df2 = session.read_gbq(penguins_table_id) + df1.columns = [ + "species1", + "island1", + "culmen_length_mm1", + "culmen_depth_mm1", + "flipper_length_mm1", + "body_mass_g1", + "sex1", + ] + df3 = df1.join(df2) + assert df3 is not None + + def test_read_gbq_model(session, penguins_linear_model_name): model = session.read_gbq_model(penguins_linear_model_name) assert isinstance(model, bigframes.ml.linear_model.LinearRegression) diff --git a/tests/unit/session/test_io_bigquery.py b/tests/unit/session/test_io_bigquery.py index c87835c412..3f3bfbe7d3 100644 --- a/tests/unit/session/test_io_bigquery.py +++ b/tests/unit/session/test_io_bigquery.py @@ -147,20 +147,6 @@ def test_create_snapshot_sql_doesnt_timetravel_anonymous_datasets(): assert "`my-test-project`.`_e8166e0cdb`.`anonbb92cd`" in sql -def test_create_snapshot_sql_doesnt_timetravel_session_tables(): - table_ref = bigquery.TableReference.from_string("my-test-project._session.abcdefg") - - sql = bigframes.session._io.bigquery.create_snapshot_sql( - table_ref, datetime.datetime.now(datetime.timezone.utc) - ) - - # We aren't modifying _SESSION tables, so don't use time travel. - assert "SYSTEM_TIME" not in sql - - # Don't need the project ID for _SESSION tables. - assert "my-test-project" not in sql - - def test_create_temp_table_default_expiration(): """Make sure the created table has an expiration.""" bqclient = mock.create_autospec(bigquery.Client) From 915d7d367435a435250dc4c66a385b416540fbd3 Mon Sep 17 00:00:00 2001 From: Ashley Xu Date: Fri, 24 Nov 2023 00:28:22 +0000 Subject: [PATCH 04/11] Give the users the option to clear the table cached snapshot --- bigframes/session/__init__.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index 949084cdea..e12a2fede1 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -214,6 +214,14 @@ def __hash__(self): # Stable hash needed to use in expression tree return hash(str(self._anonymous_dataset)) + def clear_cache(self, table: str): + """Clear the cached snapshot for the table reference.""" + table_ref = bigquery.table.TableReference.from_string( + table, default_project=self.bqclient.project + ) + if table_ref in self._df_snapshot: + self._df_snapshot.pop(table_ref) + def _create_bq_datasets(self): """Create and identify dataset(s) for temporary BQ resources.""" query_job = self.bqclient.query("SELECT 1", location=self._location) @@ -511,8 +519,6 @@ def _read_gbq_table( if max_results and max_results <= 0: raise ValueError("`max_results` should be a positive number.") - # TODO(swast): Can we re-use the temp table from other reads in the - # session, if the original table wasn't modified? table_ref = bigquery.table.TableReference.from_string( query, default_project=self.bqclient.project ) From 995eb468b6db65c3a020b7d2556bf7dde618680b Mon Sep 17 00:00:00 2001 From: Ashley Xu Date: Mon, 27 Nov 2023 18:08:35 +0000 Subject: [PATCH 05/11] fix: add docstring to pandas --- bigframes/pandas/__init__.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/bigframes/pandas/__init__.py b/bigframes/pandas/__init__.py index d35f838366..8fa1603b6f 100644 --- a/bigframes/pandas/__init__.py +++ b/bigframes/pandas/__init__.py @@ -122,6 +122,16 @@ def concat( concat.__doc__ = vendored_pandas_concat.concat.__doc__ +def clear_cache(table: str): + return global_session.with_default_session( + bigframes.session.Session.clear_cache, + table=table, + ) + + +clear_cache.__doc__ = bigframes.session.Session.clear_cache.__doc__ + + def cut( x: bigframes.series.Series, bins: int, @@ -652,7 +662,7 @@ def read_gbq_function(function_name: str): # Use __all__ to let type checkers know what is part of the public API. __all___ = [ # Functions - "concat", + "clear_cache" "concat", "merge", "read_csv", "read_gbq", From 6a07ed065dbd2c46947d526313ee90f047ee22f3 Mon Sep 17 00:00:00 2001 From: Ashley Xu Date: Mon, 27 Nov 2023 19:36:38 +0000 Subject: [PATCH 06/11] add use_cache param for read_gbq_table --- bigframes/pandas/__init__.py | 14 +++----- bigframes/session/__init__.py | 33 +++++++++++-------- .../bigframes_vendored/pandas/io/gbq.py | 3 ++ 3 files changed, 27 insertions(+), 23 deletions(-) diff --git a/bigframes/pandas/__init__.py b/bigframes/pandas/__init__.py index 8fa1603b6f..ecf62f9220 100644 --- a/bigframes/pandas/__init__.py +++ b/bigframes/pandas/__init__.py @@ -122,16 +122,6 @@ def concat( concat.__doc__ = vendored_pandas_concat.concat.__doc__ -def clear_cache(table: str): - return global_session.with_default_session( - bigframes.session.Session.clear_cache, - table=table, - ) - - -clear_cache.__doc__ = bigframes.session.Session.clear_cache.__doc__ - - def cut( x: bigframes.series.Series, bins: int, @@ -496,6 +486,7 @@ def read_gbq( index_col: Iterable[str] | str = (), col_order: Iterable[str] = (), max_results: Optional[int] = None, + use_cache: bool = True, ) -> bigframes.dataframe.DataFrame: _set_default_session_location_if_possible(query_or_table) return global_session.with_default_session( @@ -504,6 +495,7 @@ def read_gbq( index_col=index_col, col_order=col_order, max_results=max_results, + use_cache=use_cache, ) @@ -546,6 +538,7 @@ def read_gbq_table( index_col: Iterable[str] | str = (), col_order: Iterable[str] = (), max_results: Optional[int] = None, + use_cache: bool = True, ) -> bigframes.dataframe.DataFrame: _set_default_session_location_if_possible(query) return global_session.with_default_session( @@ -554,6 +547,7 @@ def read_gbq_table( index_col=index_col, col_order=col_order, max_results=max_results, + use_cache=use_cache, ) diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index e12a2fede1..a2771cc2de 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -214,14 +214,6 @@ def __hash__(self): # Stable hash needed to use in expression tree return hash(str(self._anonymous_dataset)) - def clear_cache(self, table: str): - """Clear the cached snapshot for the table reference.""" - table_ref = bigquery.table.TableReference.from_string( - table, default_project=self.bqclient.project - ) - if table_ref in self._df_snapshot: - self._df_snapshot.pop(table_ref) - def _create_bq_datasets(self): """Create and identify dataset(s) for temporary BQ resources.""" query_job = self.bqclient.query("SELECT 1", location=self._location) @@ -255,6 +247,7 @@ def read_gbq( index_col: Iterable[str] | str = (), col_order: Iterable[str] = (), max_results: Optional[int] = None, + use_cache: bool = True, # Add a verify index argument that fails if the index is not unique. ) -> dataframe.DataFrame: # TODO(b/281571214): Generate prompt to show the progress of read_gbq. @@ -276,6 +269,7 @@ def read_gbq( col_order=col_order, max_results=max_results, api_name="read_gbq", + use_cache=use_cache, ) def _query_to_destination( @@ -435,6 +429,7 @@ def read_gbq_table( index_col: Iterable[str] | str = (), col_order: Iterable[str] = (), max_results: Optional[int] = None, + use_cache: bool = True, ) -> dataframe.DataFrame: """Turn a BigQuery table into a DataFrame. @@ -457,6 +452,7 @@ def read_gbq_table( col_order=col_order, max_results=max_results, api_name="read_gbq_table", + use_cache=use_cache, ) def _get_snapshot_sql_and_primary_key( @@ -464,6 +460,7 @@ def _get_snapshot_sql_and_primary_key( table_ref: bigquery.table.TableReference, *, api_name: str, + use_cache: bool = True, ) -> Tuple[ibis_types.Table, Optional[Sequence[str]]]: """Create a read-only Ibis table expression representing a table. @@ -491,9 +488,17 @@ def _get_snapshot_sql_and_primary_key( job_config = bigquery.QueryJobConfig() job_config.labels["bigframes-api"] = api_name - - if table_ref in self._df_snapshot.keys(): - snapshot_timestamp = self._df_snapshot[table_ref] + if use_cache: + if table_ref in self._df_snapshot.keys(): + snapshot_timestamp = self._df_snapshot[table_ref] + else: + snapshot_timestamp = list( + self.bqclient.query( + "SELECT CURRENT_TIMESTAMP() AS `current_timestamp`", + job_config=job_config, + ).result() + )[0][0] + self._df_snapshot[table_ref] = snapshot_timestamp else: snapshot_timestamp = list( self.bqclient.query( @@ -501,7 +506,6 @@ def _get_snapshot_sql_and_primary_key( job_config=job_config, ).result() )[0][0] - self._df_snapshot[table_ref] = snapshot_timestamp table_expression = self.ibis_client.sql( bigframes_io.create_snapshot_sql(table_ref, snapshot_timestamp) ) @@ -515,6 +519,7 @@ def _read_gbq_table( col_order: Iterable[str] = (), max_results: Optional[int] = None, api_name: str, + use_cache: bool = True, ) -> dataframe.DataFrame: if max_results and max_results <= 0: raise ValueError("`max_results` should be a positive number.") @@ -526,7 +531,9 @@ def _read_gbq_table( ( table_expression, total_ordering_cols, - ) = self._get_snapshot_sql_and_primary_key(table_ref, api_name=api_name) + ) = self._get_snapshot_sql_and_primary_key( + table_ref, api_name=api_name, use_cache=use_cache + ) for key in col_order: if key not in table_expression.columns: diff --git a/third_party/bigframes_vendored/pandas/io/gbq.py b/third_party/bigframes_vendored/pandas/io/gbq.py index 2161310b07..eabb48e600 100644 --- a/third_party/bigframes_vendored/pandas/io/gbq.py +++ b/third_party/bigframes_vendored/pandas/io/gbq.py @@ -16,6 +16,7 @@ def read_gbq( index_col: Iterable[str] | str = (), col_order: Iterable[str] = (), max_results: Optional[int] = None, + use_cache: bool = True, ): """Loads a DataFrame from BigQuery. @@ -83,6 +84,8 @@ def read_gbq( max_results (Optional[int], default None): If set, limit the maximum number of rows to fetch from the query results. + use_cache (bool, default True): + Whether to cache the query inputs. Default to True. Returns: bigframes.dataframe.DataFrame: A DataFrame representing results of the query or table. From 39e1624f26c7e46c4a4b574dd661f0f82ac7f77a Mon Sep 17 00:00:00 2001 From: Ashley Xu Date: Mon, 27 Nov 2023 19:40:05 +0000 Subject: [PATCH 07/11] reinforce running From 68da750e32ac6e51ff050372f2280f26e82ba57f Mon Sep 17 00:00:00 2001 From: Ashley Xu Date: Mon, 27 Nov 2023 23:02:14 +0000 Subject: [PATCH 08/11] allow empty to trigger run again From 76df8fbd189169bc7db9e2bcc4c2754777e119c1 Mon Sep 17 00:00:00 2001 From: Ashley Xu Date: Tue, 28 Nov 2023 18:34:47 +0000 Subject: [PATCH 09/11] address comments --- bigframes/pandas/__init__.py | 4 +++- bigframes/session/__init__.py | 22 +++++++++++----------- 2 files changed, 14 insertions(+), 12 deletions(-) diff --git a/bigframes/pandas/__init__.py b/bigframes/pandas/__init__.py index ecf62f9220..0c2c1f87aa 100644 --- a/bigframes/pandas/__init__.py +++ b/bigframes/pandas/__init__.py @@ -518,6 +518,7 @@ def read_gbq_query( index_col: Iterable[str] | str = (), col_order: Iterable[str] = (), max_results: Optional[int] = None, + use_cache: bool = True, ) -> bigframes.dataframe.DataFrame: _set_default_session_location_if_possible(query) return global_session.with_default_session( @@ -526,6 +527,7 @@ def read_gbq_query( index_col=index_col, col_order=col_order, max_results=max_results, + use_cache=use_cache, ) @@ -656,7 +658,7 @@ def read_gbq_function(function_name: str): # Use __all__ to let type checkers know what is part of the public API. __all___ = [ # Functions - "clear_cache" "concat", + "concat", "merge", "read_csv", "read_gbq", diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index a2771cc2de..75cba97d77 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -258,6 +258,7 @@ def read_gbq( col_order=col_order, max_results=max_results, api_name="read_gbq", + use_cache=use_cache, ) else: # TODO(swast): Query the snapshot table but mark it as a @@ -322,6 +323,7 @@ def read_gbq_query( index_col: Iterable[str] | str = (), col_order: Iterable[str] = (), max_results: Optional[int] = None, + use_cache: bool = True, ) -> dataframe.DataFrame: """Turn a SQL query into a DataFrame. @@ -379,6 +381,7 @@ def read_gbq_query( col_order=col_order, max_results=max_results, api_name="read_gbq_query", + use_cache=use_cache, ) def _read_gbq_query( @@ -389,6 +392,7 @@ def _read_gbq_query( col_order: Iterable[str] = (), max_results: Optional[int] = None, api_name: str = "read_gbq_query", + use_cache: bool = True, ) -> dataframe.DataFrame: if isinstance(index_col, str): index_cols = [index_col] @@ -398,6 +402,8 @@ def _read_gbq_query( destination, query_job = self._query_to_destination( query, index_cols, api_name=api_name ) + if query_job is not None: + query_job.use_query_cache = use_cache # If there was no destination table, that means the query must have # been DDL or DML. Return some job metadata, instead. @@ -420,6 +426,7 @@ def _read_gbq_query( index_col=index_cols, col_order=col_order, max_results=max_results, + use_cache=use_cache, ) def read_gbq_table( @@ -488,17 +495,9 @@ def _get_snapshot_sql_and_primary_key( job_config = bigquery.QueryJobConfig() job_config.labels["bigframes-api"] = api_name - if use_cache: - if table_ref in self._df_snapshot.keys(): - snapshot_timestamp = self._df_snapshot[table_ref] - else: - snapshot_timestamp = list( - self.bqclient.query( - "SELECT CURRENT_TIMESTAMP() AS `current_timestamp`", - job_config=job_config, - ).result() - )[0][0] - self._df_snapshot[table_ref] = snapshot_timestamp + job_config.use_query_cache = use_cache + if use_cache and table_ref in self._df_snapshot.keys(): + snapshot_timestamp = self._df_snapshot[table_ref] else: snapshot_timestamp = list( self.bqclient.query( @@ -506,6 +505,7 @@ def _get_snapshot_sql_and_primary_key( job_config=job_config, ).result() )[0][0] + self._df_snapshot[table_ref] = snapshot_timestamp table_expression = self.ibis_client.sql( bigframes_io.create_snapshot_sql(table_ref, snapshot_timestamp) ) From 79f090f3800342327cf6609cea23379b5537b871 Mon Sep 17 00:00:00 2001 From: Ashley Xu Date: Tue, 28 Nov 2023 19:28:48 +0000 Subject: [PATCH 10/11] fix the mypy failure --- bigframes/session/__init__.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index 75cba97d77..63398cb764 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -402,8 +402,6 @@ def _read_gbq_query( destination, query_job = self._query_to_destination( query, index_cols, api_name=api_name ) - if query_job is not None: - query_job.use_query_cache = use_cache # If there was no destination table, that means the query must have # been DDL or DML. Return some job metadata, instead. @@ -416,6 +414,7 @@ def _read_gbq_query( ], "job_id": [query_job.job_id if query_job else "unknown"], "location": [query_job.location if query_job else "unknown"], + "use_query_cache": [use_cache], } ), session=self, From df798181c3bbe52bc973640c26515c1d519b3658 Mon Sep 17 00:00:00 2001 From: Ashley Xu Date: Tue, 28 Nov 2023 22:12:57 +0000 Subject: [PATCH 11/11] address comments --- bigframes/session/__init__.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index 63398cb764..d9b27bd2ed 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -278,6 +278,7 @@ def _query_to_destination( query: str, index_cols: List[str], api_name: str, + use_cache: bool = True, ) -> Tuple[Optional[bigquery.TableReference], Optional[bigquery.QueryJob]]: # If a dry_run indicates this is not a query type job, then don't # bother trying to do a CREATE TEMP TABLE ... AS SELECT ... statement. @@ -302,6 +303,7 @@ def _query_to_destination( job_config = bigquery.QueryJobConfig() job_config.labels["bigframes-api"] = api_name job_config.destination = temp_table + job_config.use_query_cache = use_cache try: # Write to temp table to workaround BigQuery 10 GB query results @@ -400,7 +402,10 @@ def _read_gbq_query( index_cols = list(index_col) destination, query_job = self._query_to_destination( - query, index_cols, api_name=api_name + query, + index_cols, + api_name=api_name, + use_cache=use_cache, ) # If there was no destination table, that means the query must have @@ -414,7 +419,6 @@ def _read_gbq_query( ], "job_id": [query_job.job_id if query_job else "unknown"], "location": [query_job.location if query_job else "unknown"], - "use_query_cache": [use_cache], } ), session=self, @@ -494,7 +498,6 @@ def _get_snapshot_sql_and_primary_key( job_config = bigquery.QueryJobConfig() job_config.labels["bigframes-api"] = api_name - job_config.use_query_cache = use_cache if use_cache and table_ref in self._df_snapshot.keys(): snapshot_timestamp = self._df_snapshot[table_ref] else: