Skip to content

Commit

Permalink
perf: inline read_pandas for small data (#383)
Browse files Browse the repository at this point in the history
Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly:
- [ ] Make sure to open an issue as a [bug/issue](https://togithub.com/googleapis/python-bigquery-dataframes/issues/new/choose) before writing your code!  That way we can discuss the change, evaluate designs, and agree on the general idea
- [ ] Ensure the tests and linter pass
- [ ] Code coverage does not decrease (if any source code was changed)
- [ ] Appropriate docs were updated (if necessary)

Fixes #312729021 🦕
  • Loading branch information
GarrettWu authored Feb 21, 2024
1 parent 67fd434 commit 59b446b
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 39 deletions.
16 changes: 1 addition & 15 deletions bigframes/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,6 @@
import bigframes.session


# BigQuery has 1 MB query size limit, 5000 items shouldn't take more than 10% of this depending on data type.
# TODO(tbergeron): Convert to bytes-based limit
MAX_INLINE_DF_SIZE = 5000

LevelType = typing.Hashable
LevelsType = typing.Union[LevelType, typing.Sequence[LevelType]]
SingleItemValue = Union[bigframes.series.Series, int, float, Callable]
Expand Down Expand Up @@ -170,17 +166,7 @@ def __init__(
columns=columns, # type:ignore
dtype=dtype, # type:ignore
)
if (
pd_dataframe.size < MAX_INLINE_DF_SIZE
# TODO(swast): Workaround data types limitation in inline data.
and not any(
dt.pyarrow_dtype
for dt in pd_dataframe.dtypes
if isinstance(dt, pandas.ArrowDtype)
)
):
self._block = blocks.Block.from_local(pd_dataframe)
elif session:
if session:
self._block = session.read_pandas(pd_dataframe)._get_block()
else:
self._block = bigframes.pandas.read_pandas(pd_dataframe)._get_block()
Expand Down
16 changes: 1 addition & 15 deletions bigframes/operations/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,6 @@
import bigframes.session
import third_party.bigframes_vendored.pandas.pandas._typing as vendored_pandas_typing

# BigQuery has 1 MB query size limit, 5000 items shouldn't take more than 10% of this depending on data type.
# TODO(tbergeron): Convert to bytes-based limit
MAX_INLINE_SERIES_SIZE = 5000


class SeriesMethods:
def __init__(
Expand Down Expand Up @@ -104,17 +100,7 @@ def __init__(
if pd_series.name is None:
# to_frame will set default numeric column label if unnamed, but we do not support int column label, so must rename
pd_dataframe = pd_dataframe.set_axis(["unnamed_col"], axis=1)
if (
pd_dataframe.size < MAX_INLINE_SERIES_SIZE
# TODO(swast): Workaround data types limitation in inline data.
and not any(
dt.pyarrow_dtype
for dt in pd_dataframe.dtypes
if isinstance(dt, pd.ArrowDtype)
)
):
block = blocks.Block.from_local(pd_dataframe)
elif session:
if session:
block = session.read_pandas(pd_dataframe)._get_block()
else:
# Uses default global session
Expand Down
33 changes: 30 additions & 3 deletions bigframes/session/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,10 @@
"UTF-32LE",
}

# BigQuery has 1 MB query size limit, 5000 items shouldn't take more than 10% of this depending on data type.
# TODO(tbergeron): Convert to bytes-based limit
MAX_INLINE_DF_SIZE = 5000

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -882,6 +886,29 @@ def read_pandas(self, pandas_dataframe: pandas.DataFrame) -> dataframe.DataFrame

def _read_pandas(
self, pandas_dataframe: pandas.DataFrame, api_name: str
) -> dataframe.DataFrame:
if (
pandas_dataframe.size < MAX_INLINE_DF_SIZE
# TODO(swast): Workaround data types limitation in inline data.
and not any(
(
isinstance(s.dtype, pandas.ArrowDtype)
or (len(s) > 0 and pandas.api.types.is_list_like(s.iloc[0]))
or pandas.api.types.is_datetime64_any_dtype(s)
)
for _, s in pandas_dataframe.items()
)
):
return self._read_pandas_inline(pandas_dataframe)
return self._read_pandas_load_job(pandas_dataframe, api_name)

def _read_pandas_inline(
self, pandas_dataframe: pandas.DataFrame
) -> dataframe.DataFrame:
return dataframe.DataFrame(blocks.Block.from_local(pandas_dataframe))

def _read_pandas_load_job(
self, pandas_dataframe: pandas.DataFrame, api_name: str
) -> dataframe.DataFrame:
col_labels, idx_labels = (
pandas_dataframe.columns.to_list(),
Expand Down Expand Up @@ -1079,7 +1106,7 @@ def read_csv(
encoding=encoding,
**kwargs,
)
return self.read_pandas(pandas_df) # type: ignore
return self._read_pandas(pandas_df, "read_csv") # type: ignore

def read_pickle(
self,
Expand All @@ -1096,7 +1123,7 @@ def read_pickle(
if isinstance(pandas_obj, pandas.Series):
if pandas_obj.name is None:
pandas_obj.name = "0"
bigframes_df = self.read_pandas(pandas_obj.to_frame())
bigframes_df = self._read_pandas(pandas_obj.to_frame(), "read_pickle")
return bigframes_df[bigframes_df.columns[0]]
return self._read_pandas(pandas_obj, "read_pickle")

Expand Down Expand Up @@ -1196,7 +1223,7 @@ def read_json(
engine=engine,
**kwargs,
)
return self.read_pandas(pandas_df)
return self._read_pandas(pandas_df, "read_json")

def _check_file_size(self, filepath: str):
max_size = 1024 * 1024 * 1024 # 1 GB in bytes
Expand Down
9 changes: 8 additions & 1 deletion tests/system/small/test_progress_bar.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
import re
import tempfile

import numpy as np
import pandas as pd

import bigframes as bf
import bigframes.formatting_helpers as formatting_helpers
from bigframes.session import MAX_INLINE_DF_SIZE

job_load_message_regex = r"\w+ job [\w-]+ is \w+\."

Expand Down Expand Up @@ -66,10 +68,15 @@ def test_progress_bar_extract_jobs(
def test_progress_bar_load_jobs(
session: bf.Session, penguins_pandas_df_default_index: pd.DataFrame, capsys
):
# repeat the DF to be big enough to trigger the load job.
df = penguins_pandas_df_default_index
while len(df) < MAX_INLINE_DF_SIZE:
df = pd.DataFrame(np.repeat(df.values, 2, axis=0))

bf.options.display.progress_bar = "terminal"
with tempfile.TemporaryDirectory() as dir:
path = dir + "/test_read_csv_progress_bar*.csv"
penguins_pandas_df_default_index.to_csv(path, index=False)
df.to_csv(path, index=False)
capsys.readouterr() # clear output
session.read_csv(path)

Expand Down
16 changes: 11 additions & 5 deletions tests/unit/session/test_io_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from bigframes.core import log_adapter
import bigframes.pandas as bpd
import bigframes.session._io.bigquery as io_bq
from tests.unit import resources


def test_create_job_configs_labels_is_none():
Expand Down Expand Up @@ -64,7 +65,9 @@ def test_create_job_configs_labels_log_adaptor_call_method_under_length_limit():
"bigframes-api": "read_pandas",
"source": "bigquery-dataframes-temp",
}
df = bpd.DataFrame({"col1": [1, 2], "col2": [3, 4]})
df = bpd.DataFrame(
{"col1": [1, 2], "col2": [3, 4]}, session=resources.create_bigquery_session()
)
# Test running two methods
df.head()
df.max()
Expand All @@ -81,15 +84,16 @@ def test_create_job_configs_labels_log_adaptor_call_method_under_length_limit():
"recent-bigframes-api-2": "dataframe-__init__",
"recent-bigframes-api-3": "dataframe-head",
"recent-bigframes-api-4": "dataframe-__init__",
"recent-bigframes-api-5": "dataframe-__init__",
}
assert labels is not None
assert len(labels) == 7
assert labels == expected_dict


def test_create_job_configs_labels_length_limit_met_and_labels_is_none():
log_adapter.get_and_reset_api_methods()
df = bpd.DataFrame({"col1": [1, 2], "col2": [3, 4]})
df = bpd.DataFrame(
{"col1": [1, 2], "col2": [3, 4]}, session=resources.create_bigquery_session()
)
# Test running methods more than the labels' length limit
for i in range(66):
df.head()
Expand All @@ -114,7 +118,9 @@ def test_create_job_configs_labels_length_limit_met():
value = f"test{i}"
cur_labels[key] = value
# If cur_labels length is 62, we can only add one label from api_methods
df = bpd.DataFrame({"col1": [1, 2], "col2": [3, 4]})
df = bpd.DataFrame(
{"col1": [1, 2], "col2": [3, 4]}, session=resources.create_bigquery_session()
)
# Test running two methods
df.head()
df.max()
Expand Down

0 comments on commit 59b446b

Please sign in to comment.