From 59b446bad8d2c5fca791c384616cfa7e54d54c09 Mon Sep 17 00:00:00 2001 From: Garrett Wu <6505921+GarrettWu@users.noreply.github.com> Date: Wed, 21 Feb 2024 11:48:15 -0800 Subject: [PATCH] perf: inline read_pandas for small data (#383) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly: - [ ] Make sure to open an issue as a [bug/issue](https://togithub.com/googleapis/python-bigquery-dataframes/issues/new/choose) before writing your code! That way we can discuss the change, evaluate designs, and agree on the general idea - [ ] Ensure the tests and linter pass - [ ] Code coverage does not decrease (if any source code was changed) - [ ] Appropriate docs were updated (if necessary) Fixes #312729021 🦕 --- bigframes/dataframe.py | 16 +----------- bigframes/operations/base.py | 16 +----------- bigframes/session/__init__.py | 33 ++++++++++++++++++++++--- tests/system/small/test_progress_bar.py | 9 ++++++- tests/unit/session/test_io_bigquery.py | 16 ++++++++---- 5 files changed, 51 insertions(+), 39 deletions(-) diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index ccbf68ebb5..d467239ea6 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -69,10 +69,6 @@ import bigframes.session -# BigQuery has 1 MB query size limit, 5000 items shouldn't take more than 10% of this depending on data type. -# TODO(tbergeron): Convert to bytes-based limit -MAX_INLINE_DF_SIZE = 5000 - LevelType = typing.Hashable LevelsType = typing.Union[LevelType, typing.Sequence[LevelType]] SingleItemValue = Union[bigframes.series.Series, int, float, Callable] @@ -170,17 +166,7 @@ def __init__( columns=columns, # type:ignore dtype=dtype, # type:ignore ) - if ( - pd_dataframe.size < MAX_INLINE_DF_SIZE - # TODO(swast): Workaround data types limitation in inline data. - and not any( - dt.pyarrow_dtype - for dt in pd_dataframe.dtypes - if isinstance(dt, pandas.ArrowDtype) - ) - ): - self._block = blocks.Block.from_local(pd_dataframe) - elif session: + if session: self._block = session.read_pandas(pd_dataframe)._get_block() else: self._block = bigframes.pandas.read_pandas(pd_dataframe)._get_block() diff --git a/bigframes/operations/base.py b/bigframes/operations/base.py index 04114b43cb..154247c033 100644 --- a/bigframes/operations/base.py +++ b/bigframes/operations/base.py @@ -30,10 +30,6 @@ import bigframes.session import third_party.bigframes_vendored.pandas.pandas._typing as vendored_pandas_typing -# BigQuery has 1 MB query size limit, 5000 items shouldn't take more than 10% of this depending on data type. -# TODO(tbergeron): Convert to bytes-based limit -MAX_INLINE_SERIES_SIZE = 5000 - class SeriesMethods: def __init__( @@ -104,17 +100,7 @@ def __init__( if pd_series.name is None: # to_frame will set default numeric column label if unnamed, but we do not support int column label, so must rename pd_dataframe = pd_dataframe.set_axis(["unnamed_col"], axis=1) - if ( - pd_dataframe.size < MAX_INLINE_SERIES_SIZE - # TODO(swast): Workaround data types limitation in inline data. - and not any( - dt.pyarrow_dtype - for dt in pd_dataframe.dtypes - if isinstance(dt, pd.ArrowDtype) - ) - ): - block = blocks.Block.from_local(pd_dataframe) - elif session: + if session: block = session.read_pandas(pd_dataframe)._get_block() else: # Uses default global session diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index df0cd6e947..20dd39c0fa 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -108,6 +108,10 @@ "UTF-32LE", } +# BigQuery has 1 MB query size limit, 5000 items shouldn't take more than 10% of this depending on data type. +# TODO(tbergeron): Convert to bytes-based limit +MAX_INLINE_DF_SIZE = 5000 + logger = logging.getLogger(__name__) @@ -882,6 +886,29 @@ def read_pandas(self, pandas_dataframe: pandas.DataFrame) -> dataframe.DataFrame def _read_pandas( self, pandas_dataframe: pandas.DataFrame, api_name: str + ) -> dataframe.DataFrame: + if ( + pandas_dataframe.size < MAX_INLINE_DF_SIZE + # TODO(swast): Workaround data types limitation in inline data. + and not any( + ( + isinstance(s.dtype, pandas.ArrowDtype) + or (len(s) > 0 and pandas.api.types.is_list_like(s.iloc[0])) + or pandas.api.types.is_datetime64_any_dtype(s) + ) + for _, s in pandas_dataframe.items() + ) + ): + return self._read_pandas_inline(pandas_dataframe) + return self._read_pandas_load_job(pandas_dataframe, api_name) + + def _read_pandas_inline( + self, pandas_dataframe: pandas.DataFrame + ) -> dataframe.DataFrame: + return dataframe.DataFrame(blocks.Block.from_local(pandas_dataframe)) + + def _read_pandas_load_job( + self, pandas_dataframe: pandas.DataFrame, api_name: str ) -> dataframe.DataFrame: col_labels, idx_labels = ( pandas_dataframe.columns.to_list(), @@ -1079,7 +1106,7 @@ def read_csv( encoding=encoding, **kwargs, ) - return self.read_pandas(pandas_df) # type: ignore + return self._read_pandas(pandas_df, "read_csv") # type: ignore def read_pickle( self, @@ -1096,7 +1123,7 @@ def read_pickle( if isinstance(pandas_obj, pandas.Series): if pandas_obj.name is None: pandas_obj.name = "0" - bigframes_df = self.read_pandas(pandas_obj.to_frame()) + bigframes_df = self._read_pandas(pandas_obj.to_frame(), "read_pickle") return bigframes_df[bigframes_df.columns[0]] return self._read_pandas(pandas_obj, "read_pickle") @@ -1196,7 +1223,7 @@ def read_json( engine=engine, **kwargs, ) - return self.read_pandas(pandas_df) + return self._read_pandas(pandas_df, "read_json") def _check_file_size(self, filepath: str): max_size = 1024 * 1024 * 1024 # 1 GB in bytes diff --git a/tests/system/small/test_progress_bar.py b/tests/system/small/test_progress_bar.py index bd13ac2240..1c04b580fc 100644 --- a/tests/system/small/test_progress_bar.py +++ b/tests/system/small/test_progress_bar.py @@ -15,10 +15,12 @@ import re import tempfile +import numpy as np import pandas as pd import bigframes as bf import bigframes.formatting_helpers as formatting_helpers +from bigframes.session import MAX_INLINE_DF_SIZE job_load_message_regex = r"\w+ job [\w-]+ is \w+\." @@ -66,10 +68,15 @@ def test_progress_bar_extract_jobs( def test_progress_bar_load_jobs( session: bf.Session, penguins_pandas_df_default_index: pd.DataFrame, capsys ): + # repeat the DF to be big enough to trigger the load job. + df = penguins_pandas_df_default_index + while len(df) < MAX_INLINE_DF_SIZE: + df = pd.DataFrame(np.repeat(df.values, 2, axis=0)) + bf.options.display.progress_bar = "terminal" with tempfile.TemporaryDirectory() as dir: path = dir + "/test_read_csv_progress_bar*.csv" - penguins_pandas_df_default_index.to_csv(path, index=False) + df.to_csv(path, index=False) capsys.readouterr() # clear output session.read_csv(path) diff --git a/tests/unit/session/test_io_bigquery.py b/tests/unit/session/test_io_bigquery.py index 96bb7bf67f..406de2b88e 100644 --- a/tests/unit/session/test_io_bigquery.py +++ b/tests/unit/session/test_io_bigquery.py @@ -23,6 +23,7 @@ from bigframes.core import log_adapter import bigframes.pandas as bpd import bigframes.session._io.bigquery as io_bq +from tests.unit import resources def test_create_job_configs_labels_is_none(): @@ -64,7 +65,9 @@ def test_create_job_configs_labels_log_adaptor_call_method_under_length_limit(): "bigframes-api": "read_pandas", "source": "bigquery-dataframes-temp", } - df = bpd.DataFrame({"col1": [1, 2], "col2": [3, 4]}) + df = bpd.DataFrame( + {"col1": [1, 2], "col2": [3, 4]}, session=resources.create_bigquery_session() + ) # Test running two methods df.head() df.max() @@ -81,15 +84,16 @@ def test_create_job_configs_labels_log_adaptor_call_method_under_length_limit(): "recent-bigframes-api-2": "dataframe-__init__", "recent-bigframes-api-3": "dataframe-head", "recent-bigframes-api-4": "dataframe-__init__", + "recent-bigframes-api-5": "dataframe-__init__", } - assert labels is not None - assert len(labels) == 7 assert labels == expected_dict def test_create_job_configs_labels_length_limit_met_and_labels_is_none(): log_adapter.get_and_reset_api_methods() - df = bpd.DataFrame({"col1": [1, 2], "col2": [3, 4]}) + df = bpd.DataFrame( + {"col1": [1, 2], "col2": [3, 4]}, session=resources.create_bigquery_session() + ) # Test running methods more than the labels' length limit for i in range(66): df.head() @@ -114,7 +118,9 @@ def test_create_job_configs_labels_length_limit_met(): value = f"test{i}" cur_labels[key] = value # If cur_labels length is 62, we can only add one label from api_methods - df = bpd.DataFrame({"col1": [1, 2], "col2": [3, 4]}) + df = bpd.DataFrame( + {"col1": [1, 2], "col2": [3, 4]}, session=resources.create_bigquery_session() + ) # Test running two methods df.head() df.max()