Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

deps: support pandas 2.2 #492

Merged
merged 16 commits into from
Mar 22, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 18 additions & 10 deletions bigframes/core/joins/merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,18 @@

from __future__ import annotations

import typing
from typing import Literal, Optional

from bigframes.dataframe import DataFrame
from bigframes.series import Series
# Avoid cirular imports.
if typing.TYPE_CHECKING:
import bigframes.dataframe
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like disambiguating "DataFrame..." should we just always qualify it?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes we should. I have a bug open to do that. Issue 296390934. Fully qualified names will also help with docs rendering.

import bigframes.series


def merge(
left: DataFrame,
right: DataFrame,
left: bigframes.dataframe.DataFrame,
right: bigframes.dataframe.DataFrame,
how: Literal[
"inner",
"left",
Expand All @@ -40,7 +43,7 @@ def merge(
right_on: Optional[str] = None,
sort: bool = False,
suffixes: tuple[str, str] = ("_x", "_y"),
) -> DataFrame:
) -> bigframes.dataframe.DataFrame:
left = _validate_operand(left)
right = _validate_operand(right)

Expand All @@ -55,14 +58,19 @@ def merge(
)


def _validate_operand(obj: DataFrame | Series) -> DataFrame:
if isinstance(obj, DataFrame):
def _validate_operand(
obj: bigframes.dataframe.DataFrame | bigframes.series.Series,
) -> bigframes.dataframe.DataFrame:
import bigframes.dataframe
import bigframes.series

if isinstance(obj, bigframes.dataframe.DataFrame):
return obj
elif isinstance(obj, Series):
elif isinstance(obj, bigframes.series.Series):
if obj.name is None:
raise ValueError("Cannot merge a Series without a name")
raise ValueError("Cannot merge a bigframes.series.Series without a name")
return obj.to_frame()
else:
raise TypeError(
f"Can only merge Series or DataFrame objects, a {type(obj)} was passed"
f"Can only merge bigframes.series.Series or bigframes.dataframe.DataFrame objects, a {type(obj)} was passed"
)
13 changes: 10 additions & 3 deletions bigframes/core/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,9 @@ def split_index(


def get_standardized_ids(
col_labels: Iterable[Hashable], idx_labels: Iterable[Hashable] = ()
col_labels: Iterable[Hashable],
idx_labels: Iterable[Hashable] = (),
strict: bool = False,
) -> tuple[list[str], list[str]]:
"""Get stardardized column ids as column_ids_list, index_ids_list.
The standardized_column_id must be valid BQ SQL schema column names, can only be string type and unique.
Expand All @@ -84,11 +86,15 @@ def get_standardized_ids(
Tuple of (standardized_column_ids, standardized_index_ids)
"""
col_ids = [
UNNAMED_COLUMN_ID if col_label is None else label_to_identifier(col_label)
UNNAMED_COLUMN_ID
if col_label is None
else label_to_identifier(col_label, strict=strict)
for col_label in col_labels
]
idx_ids = [
UNNAMED_INDEX_ID if idx_label is None else label_to_identifier(idx_label)
UNNAMED_INDEX_ID
if idx_label is None
else label_to_identifier(idx_label, strict=strict)
for idx_label in idx_labels
]

Expand All @@ -107,6 +113,7 @@ def label_to_identifier(label: typing.Hashable, strict: bool = False) -> str:
# Column values will be loaded as null if the column name has spaces.
# https://github.com/googleapis/python-bigquery/issues/1566
identifier = str(label).replace(" ", "_")

if strict:
identifier = re.sub(r"[^a-zA-Z0-9_]", "", identifier)
if not identifier:
Expand Down
2 changes: 1 addition & 1 deletion bigframes/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,10 @@
if typing.TYPE_CHECKING:
import bigframes.session

SingleItemValue = Union[bigframes.series.Series, int, float, Callable]

LevelType = typing.Hashable
LevelsType = typing.Union[LevelType, typing.Sequence[LevelType]]
SingleItemValue = Union[bigframes.series.Series, int, float, Callable]

ERROR_IO_ONLY_GS_PATHS = f"Only Google Cloud Storage (gs://...) paths are supported. {constants.FEEDBACK_LINK}"
ERROR_IO_REQUIRES_WILDCARD = (
Expand Down
30 changes: 25 additions & 5 deletions bigframes/session/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@
import bigframes.core.ordering as orderings
import bigframes.core.traversal as traversals
import bigframes.core.utils as utils
import bigframes.dataframe as dataframe
import bigframes.dtypes
import bigframes.formatting_helpers as formatting_helpers
from bigframes.functions.remote_function import read_gbq_function as bigframes_rgf
Expand All @@ -92,6 +91,10 @@
import bigframes.session.clients
import bigframes.version

# Avoid circular imports.
if typing.TYPE_CHECKING:
import bigframes.dataframe as dataframe

_BIGFRAMES_DEFAULT_CONNECTION_ID = "bigframes-default-connection"

_MAX_CLUSTER_COLUMNS = 4
Expand Down Expand Up @@ -532,6 +535,8 @@ def _read_gbq_query(
api_name: str = "read_gbq_query",
use_cache: bool = True,
) -> dataframe.DataFrame:
import bigframes.dataframe as dataframe

if isinstance(index_col, str):
index_cols = [index_col]
else:
Expand Down Expand Up @@ -706,6 +711,8 @@ def _read_gbq_table(
api_name: str,
use_cache: bool = True,
) -> dataframe.DataFrame:
import bigframes.dataframe as dataframe

if max_results and max_results <= 0:
raise ValueError("`max_results` should be a positive number.")

Expand Down Expand Up @@ -945,6 +952,8 @@ def read_pandas(self, pandas_dataframe: pandas.DataFrame) -> dataframe.DataFrame
def _read_pandas(
self, pandas_dataframe: pandas.DataFrame, api_name: str
) -> dataframe.DataFrame:
import bigframes.dataframe as dataframe

if isinstance(pandas_dataframe, dataframe.DataFrame):
raise ValueError(
"read_pandas() expects a pandas.DataFrame, but got a "
Expand All @@ -959,6 +968,8 @@ def _read_pandas(
def _read_pandas_inline(
self, pandas_dataframe: pandas.DataFrame
) -> Optional[dataframe.DataFrame]:
import bigframes.dataframe as dataframe

if pandas_dataframe.size > MAX_INLINE_DF_SIZE:
return None

Expand All @@ -980,11 +991,20 @@ def _read_pandas_inline(
def _read_pandas_load_job(
self, pandas_dataframe: pandas.DataFrame, api_name: str
) -> dataframe.DataFrame:
import bigframes.dataframe as dataframe

col_index = pandas_dataframe.columns.copy()
col_labels, idx_labels = (
pandas_dataframe.columns.to_list(),
col_index.to_list(),
pandas_dataframe.index.names,
)
new_col_ids, new_idx_ids = utils.get_standardized_ids(col_labels, idx_labels)
new_col_ids, new_idx_ids = utils.get_standardized_ids(
col_labels,
idx_labels,
# Loading parquet files into BigQuery with special column names
# is only supported under an allowlist.
strict=True,
)

# Add order column to pandas DataFrame to preserve order in BigQuery
ordering_col = "rowid"
Expand All @@ -1003,7 +1023,7 @@ def _read_pandas_load_job(

# Specify the datetime dtypes, which is auto-detected as timestamp types.
schema: list[bigquery.SchemaField] = []
for column, dtype in zip(pandas_dataframe.columns, pandas_dataframe.dtypes):
for column, dtype in zip(new_col_ids, pandas_dataframe.dtypes):
if dtype == "timestamp[us][pyarrow]":
schema.append(
bigquery.SchemaField(column, bigquery.enums.SqlTypeNames.DATETIME)
Expand Down Expand Up @@ -1057,7 +1077,7 @@ def _read_pandas_load_job(
block = blocks.Block(
array_value,
index_columns=new_idx_ids,
column_labels=col_labels,
column_labels=col_index,
index_labels=idx_labels,
)
return dataframe.DataFrame(block)
Expand Down
3 changes: 1 addition & 2 deletions bigframes/session/_io/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import bigframes
from bigframes.core import log_adapter
import bigframes.formatting_helpers as formatting_helpers
import bigframes.session._io.bigquery as bigframes_io

IO_ORDERING_ID = "bqdf_row_nums"
MAX_LABELS_COUNT = 64
Expand Down Expand Up @@ -225,7 +224,7 @@ def start_query_with_client(
Starts query job and waits for results.
"""
api_methods = log_adapter.get_and_reset_api_methods()
job_config.labels = bigframes_io.create_job_configs_labels(
job_config.labels = create_job_configs_labels(
job_configs_labels=job_config.labels, api_methods=api_methods
)

Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
"google-cloud-storage >=2.0.0",
"ibis-framework[bigquery] >=8.0.0,<9.0.0dev",
# TODO: Relax upper bound once we have fixed `system_prerelease` tests.
"pandas >=1.5.0,<2.1.4",
"pandas >=1.5.0",
"pydata-google-auth >=1.8.2",
"requests >=2.27.1",
"scikit-learn >=1.2.2",
Expand Down
4 changes: 3 additions & 1 deletion tests/system/small/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -2499,6 +2499,8 @@ def test_df_pivot(scalars_dfs, values, index, columns):
pd_result = scalars_pandas_df.pivot(values=values, index=index, columns=columns)

# Pandas produces NaN, where bq dataframes produces pd.NA
bf_result = bf_result.fillna(float("nan"))
pd_result = pd_result.fillna(float("nan"))
pd.testing.assert_frame_equal(bf_result, pd_result, check_dtype=False)


Expand Down Expand Up @@ -4018,7 +4020,7 @@ def test_to_pandas_downsampling_option_override(session):

total_memory_bytes = df.memory_usage(deep=True).sum()
total_memory_mb = total_memory_bytes / (1024 * 1024)
assert total_memory_mb == pytest.approx(download_size, rel=0.3)
assert total_memory_mb == pytest.approx(download_size, rel=0.5)


def test_to_gbq_and_create_dataset(session, scalars_df_index, dataset_id_not_created):
Expand Down
4 changes: 3 additions & 1 deletion tests/system/small/test_groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,9 @@ def test_dataframe_groupby_multi_sum(
(lambda x: x.cumsum(numeric_only=True)),
(lambda x: x.cummax(numeric_only=True)),
(lambda x: x.cummin(numeric_only=True)),
(lambda x: x.cumprod()),
# pandas 2.2 uses floating point for cumulative product even for
# integer inputs.
(lambda x: x.cumprod().astype("Float64")),
(lambda x: x.shift(periods=2)),
],
ids=[
Expand Down
3 changes: 2 additions & 1 deletion tests/system/small/test_multiindex.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,9 @@ def test_read_pandas_multi_index_axes():
[[1, 2], [3, 4]], index=index, columns=columns, dtype=pandas.Int64Dtype()
)
bf_df = bpd.DataFrame(pandas_df)
bf_df_computed = bf_df.to_pandas()

pandas.testing.assert_frame_equal(bf_df.to_pandas(), pandas_df)
pandas.testing.assert_frame_equal(bf_df_computed, pandas_df)


# Row Multi-index tests
Expand Down
25 changes: 20 additions & 5 deletions tests/system/small/test_series.py
Original file line number Diff line number Diff line change
Expand Up @@ -1497,7 +1497,8 @@ def test_groupby_prod(scalars_dfs):
(lambda x: x.cumcount()),
(lambda x: x.cummin()),
(lambda x: x.cummax()),
(lambda x: x.cumprod()),
# Pandas 2.2 casts to cumprod to float.
(lambda x: x.cumprod().astype("Float64")),
(lambda x: x.diff()),
(lambda x: x.shift(2)),
(lambda x: x.shift(-2)),
Expand All @@ -1521,7 +1522,7 @@ def test_groupby_window_ops(scalars_df_index, scalars_pandas_df_index, operator)
).to_pandas()
pd_series = operator(
scalars_pandas_df_index[col_name].groupby(scalars_pandas_df_index[group_key])
).astype(pd.Int64Dtype())
).astype(bf_series.dtype)
pd.testing.assert_series_equal(
pd_series,
bf_series,
Expand Down Expand Up @@ -2694,7 +2695,14 @@ def foo(x):
("timestamp_col", "time64[us][pyarrow]"),
("timestamp_col", pd.ArrowDtype(pa.timestamp("us"))),
("datetime_col", "date32[day][pyarrow]"),
("datetime_col", "string[pyarrow]"),
pytest.param(
"datetime_col",
"string[pyarrow]",
marks=pytest.mark.skipif(
pd.__version__.startswith("2.2"),
reason="pandas 2.2 uses T as date/time separator whereas earlier versions use space",
),
),
("datetime_col", "time64[us][pyarrow]"),
("datetime_col", pd.ArrowDtype(pa.timestamp("us", tz="UTC"))),
("date_col", "string[pyarrow]"),
Expand Down Expand Up @@ -3279,7 +3287,10 @@ def test_apply_lambda(scalars_dfs, col, lambda_):
bf_result = bf_col.apply(lambda_, by_row=False).to_pandas()

pd_col = scalars_pandas_df[col]
pd_result = pd_col.apply(lambda_)
if pd.__version__.startswith("2.2"):
pd_result = pd_col.apply(lambda_, by_row=False)
else:
pd_result = pd_col.apply(lambda_)

# ignore dtype check, which are Int64 and object respectively
assert_series_equal(bf_result, pd_result, check_dtype=False)
Expand Down Expand Up @@ -3330,7 +3341,11 @@ def foo(x):
bf_result = bf_col.apply(foo, by_row=False).to_pandas()

pd_col = scalars_pandas_df["int64_col"]
pd_result = pd_col.apply(foo)

if pd.__version__.startswith("2.2"):
pd_result = pd_col.apply(foo, by_row=False)
else:
pd_result = pd_col.apply(foo)

# ignore dtype check, which are Int64 and object respectively
assert_series_equal(bf_result, pd_result, check_dtype=False)
Expand Down
7 changes: 6 additions & 1 deletion tests/unit/test_pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import pytest

import bigframes.core.global_session
import bigframes.dataframe
import bigframes.pandas as bpd
import bigframes.session

Expand Down Expand Up @@ -67,7 +68,11 @@ def test_method_matches_session(method_name: str):

# Add `eval_str = True` so that deferred annotations are turned into their
# corresponding type objects. Need Python 3.10 for eval_str parameter.
session_signature = inspect.signature(session_method, eval_str=True)
session_signature = inspect.signature(
session_method,
eval_str=True,
globals={**vars(bigframes.session), **{"dataframe": bigframes.dataframe}},
)
pandas_signature = inspect.signature(pandas_method, eval_str=True)
assert [
# Kind includes position, which will be an offset.
Expand Down
10 changes: 5 additions & 5 deletions third_party/bigframes_vendored/pandas/core/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

from typing import Hashable, Iterable, Literal, Mapping, Optional, Sequence, Union

from bigframes_vendored.pandas.core.generic import NDFrame
import bigframes_vendored.pandas.core.generic as generic
import numpy as np
import pandas as pd

Expand All @@ -23,7 +23,7 @@
# DataFrame class


class DataFrame(NDFrame):
class DataFrame(generic.NDFrame):
"""Two-dimensional, size-mutable, potentially heterogeneous tabular data.

Data structure also contains labeled axes (rows and columns).
Expand Down Expand Up @@ -592,7 +592,7 @@ def to_records(
>>> df = bpd.DataFrame({'col1': [1, 2], 'col2': [3, 4]})
>>> df.to_records()
rec.array([(0, 1, 3), (1, 2, 4)],
dtype=[('index', 'O'), ('col1', 'O'), ('col2', 'O')])
dtype=[('index', '<i8'), ('col1', '<i8'), ('col2', '<i8')])

Args:
index (bool, default True):
Expand Down Expand Up @@ -4403,7 +4403,7 @@ def cumprod(self) -> DataFrame:
def diff(
self,
periods: int = 1,
) -> NDFrame:
) -> generic.NDFrame:
"""First discrete difference of element.

Calculates the difference of a DataFrame element compared with another
Expand Down Expand Up @@ -4751,7 +4751,7 @@ def index(self):
>>> df.index # doctest: +ELLIPSIS
Index([10, 20, 30], dtype='Int64')
>>> df.index.values
array([10, 20, 30], dtype=object)
array([10, 20, 30])

Let's try setting a new index for the dataframe and see that reflect via
``index`` property.
Expand Down
Loading