Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: update df.corr, df.cov to be used with more than 30 columns case. #1161

Merged
merged 17 commits into from
Dec 9, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 4 additions & 45 deletions bigframes/core/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -912,14 +912,17 @@ def multi_apply_unary_op(
input_varname = input_varnames[0]

block = self

result_ids = []
for col_id in columns:
label = self.col_id_to_label[col_id]
block, result_id = block.project_expr(
expr.bind_variables({input_varname: ex.deref(col_id)}),
label=label,
)
block = block.copy_values(result_id, col_id)
block = block.drop_columns([result_id])
result_ids.append(result_id)
block = block.drop_columns(result_ids)
# Special case, we can preserve transpose cache for full-frame unary ops
if (self._transpose_cache is not None) and set(self.value_columns) == set(
columns
Expand Down Expand Up @@ -1317,50 +1320,6 @@ def summarize(
index_columns=index_cols,
)

def calculate_pairwise_metric(self, op=agg_ops.CorrOp()):
"""
Returns a block object to compute pairwise metrics among all value columns in this block.

The metric to be computed is specified by the `op` parameter, which can be either a
correlation operation (default) or a covariance operation.
"""
if len(self.value_columns) > 30:
raise NotImplementedError(
"This function supports dataframes with 30 columns or fewer. "
f"Provided dataframe has {len(self.value_columns)} columns. {constants.FEEDBACK_LINK}"
)

aggregations = [
(
ex.BinaryAggregation(op, ex.deref(left_col), ex.deref(right_col)),
f"{left_col}-{right_col}",
)
for left_col in self.value_columns
for right_col in self.value_columns
]
expr = self.expr.aggregate(aggregations)

input_count = len(self.value_columns)
unpivot_columns = tuple(
tuple(expr.column_ids[input_count * i : input_count * (i + 1)])
for i in range(input_count)
)
labels = self._get_labels_for_columns(self.value_columns)

# TODO(b/340896143): fix type error
expr, (index_col_ids, _, _) = unpivot(
expr,
row_labels=labels,
unpivot_columns=unpivot_columns,
)

return Block(
expr,
column_labels=self.column_labels,
index_columns=index_col_ids,
index_labels=self.column_labels.names,
)

def explode(
self,
column_ids: typing.Sequence[str],
Expand Down
192 changes: 190 additions & 2 deletions bigframes/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -1197,15 +1197,203 @@ def corr(self, method="pearson", min_periods=None, numeric_only=False) -> DataFr
else:
frame = self._drop_non_numeric()

return DataFrame(frame._block.calculate_pairwise_metric(op=agg_ops.CorrOp()))
orig_columns = frame.columns
# Replace column names with 0 to n - 1 to keep order
# and avoid the influence of duplicated column name
frame.columns = pandas.Index(range(len(orig_columns)))
frame = frame.astype(bigframes.dtypes.FLOAT_DTYPE)
block = frame._block

# A new column that uniquely identifies each row
block, ordering_col = frame._block.promote_offsets(label="_bigframes_idx")

val_col_ids = [
col_id for col_id in block.value_columns if col_id != ordering_col
]

block = block.melt(
[ordering_col], val_col_ids, ["_bigframes_variable"], "_bigframes_value"
)

block = block.merge(
block,
left_join_ids=[ordering_col],
right_join_ids=[ordering_col],
how="inner",
sort=False,
)

frame = DataFrame(block)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: collapse this line with the line below?

frame = frame.dropna(subset=["_bigframes_value_x", "_bigframes_value_y"])

paired_mean_frame = (
frame.groupby(["_bigframes_variable_x", "_bigframes_variable_y"])
.agg(
_bigframes_paired_mean_x=bigframes.pandas.NamedAgg(
column="_bigframes_value_x", aggfunc="mean"
),
_bigframes_paired_mean_y=bigframes.pandas.NamedAgg(
column="_bigframes_value_y", aggfunc="mean"
),
)
.reset_index()
)

frame = frame.merge(
paired_mean_frame, on=["_bigframes_variable_x", "_bigframes_variable_y"]
)
frame["_bigframes_value_x"] -= frame["_bigframes_paired_mean_x"]
frame["_bigframes_value_y"] -= frame["_bigframes_paired_mean_y"]

frame["_bigframes_dividend"] = (
frame["_bigframes_value_x"] * frame["_bigframes_value_y"]
)
frame["_bigframes_x_square"] = (
frame["_bigframes_value_x"] * frame["_bigframes_value_x"]
)
frame["_bigframes_y_square"] = (
frame["_bigframes_value_y"] * frame["_bigframes_value_y"]
)

result = (
frame.groupby(["_bigframes_variable_x", "_bigframes_variable_y"])
.agg(
_bigframes_dividend_sum=bigframes.pandas.NamedAgg(
column="_bigframes_dividend", aggfunc="sum"
),
_bigframes_x_square_sum=bigframes.pandas.NamedAgg(
column="_bigframes_x_square", aggfunc="sum"
),
_bigframes_y_square_sum=bigframes.pandas.NamedAgg(
column="_bigframes_y_square", aggfunc="sum"
),
)
.reset_index()
)
result["_bigframes_corr"] = result["_bigframes_dividend_sum"] / (
(
result["_bigframes_x_square_sum"] * result["_bigframes_y_square_sum"]
)._apply_unary_op(ops.sqrt_op)
)
result = result._pivot(
index="_bigframes_variable_x",
columns="_bigframes_variable_y",
values="_bigframes_corr",
)

map_data = {
f"_bigframes_level_{i}": orig_columns.get_level_values(i)
for i in range(orig_columns.nlevels)
}
map_data["_bigframes_keys"] = range(len(orig_columns))
map_df = bigframes.dataframe.DataFrame(
map_data,
session=self._get_block().expr.session,
).set_index("_bigframes_keys")
result = result.join(map_df)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: collapse with the line below?

result = result.sort_index()
index_columns = [f"_bigframes_level_{i}" for i in range(orig_columns.nlevels)]
result = result.set_index(index_columns)
result.index.names = orig_columns.names
result.columns = orig_columns

return result

def cov(self, *, numeric_only: bool = False) -> DataFrame:
if not numeric_only:
frame = self._raise_on_non_numeric("corr")
else:
frame = self._drop_non_numeric()

return DataFrame(frame._block.calculate_pairwise_metric(agg_ops.CovOp()))
orig_columns = frame.columns
# Replace column names with 0 to n - 1 to keep order
# and avoid the influence of duplicated column name
frame.columns = pandas.Index(range(len(orig_columns)))
frame = frame.astype(bigframes.dtypes.FLOAT_DTYPE)
block = frame._block

# A new column that uniquely identifies each row
block, ordering_col = frame._block.promote_offsets(label="_bigframes_idx")

val_col_ids = [
col_id for col_id in block.value_columns if col_id != ordering_col
]

block = block.melt(
[ordering_col], val_col_ids, ["_bigframes_variable"], "_bigframes_value"
)
block = block.merge(
block,
left_join_ids=[ordering_col],
right_join_ids=[ordering_col],
how="inner",
sort=False,
)

frame = DataFrame(block)
frame = frame.dropna(subset=["_bigframes_value_x", "_bigframes_value_y"])

paired_mean_frame = (
frame.groupby(["_bigframes_variable_x", "_bigframes_variable_y"])
.agg(
_bigframes_paired_mean_x=bigframes.pandas.NamedAgg(
column="_bigframes_value_x", aggfunc="mean"
),
_bigframes_paired_mean_y=bigframes.pandas.NamedAgg(
column="_bigframes_value_y", aggfunc="mean"
),
)
.reset_index()
)

frame = frame.merge(
paired_mean_frame, on=["_bigframes_variable_x", "_bigframes_variable_y"]
)
frame["_bigframes_value_x"] -= frame["_bigframes_paired_mean_x"]
frame["_bigframes_value_y"] -= frame["_bigframes_paired_mean_y"]

frame["_bigframes_dividend"] = (
frame["_bigframes_value_x"] * frame["_bigframes_value_y"]
)

result = (
frame.groupby(["_bigframes_variable_x", "_bigframes_variable_y"])
.agg(
_bigframes_dividend_sum=bigframes.pandas.NamedAgg(
column="_bigframes_dividend", aggfunc="sum"
),
_bigframes_dividend_count=bigframes.pandas.NamedAgg(
column="_bigframes_dividend", aggfunc="count"
),
)
.reset_index()
)
result["_bigframes_cov"] = result["_bigframes_dividend_sum"] / (
result["_bigframes_dividend_count"] - 1
)
result = result._pivot(
index="_bigframes_variable_x",
columns="_bigframes_variable_y",
values="_bigframes_cov",
)

map_data = {
f"_bigframes_level_{i}": orig_columns.get_level_values(i)
for i in range(orig_columns.nlevels)
}
map_data["_bigframes_keys"] = range(len(orig_columns))
map_df = bigframes.dataframe.DataFrame(
map_data,
session=self._get_block().expr.session,
).set_index("_bigframes_keys")
result = result.join(map_df)
result = result.sort_index()
index_columns = [f"_bigframes_level_{i}" for i in range(orig_columns.nlevels)]
result = result.set_index(index_columns)
result.index.names = orig_columns.names
result.columns = orig_columns

return result

def to_arrow(
self,
Expand Down
22 changes: 22 additions & 0 deletions tests/system/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,28 @@ def scalars_dfs_maybe_ordered(
)


@pytest.fixture(scope="session")
def scalars_df_numeric_150_columns_maybe_ordered(
maybe_ordered_session,
scalars_pandas_df_index,
):
"""DataFrame pointing at test data."""
# TODO(b/379911038): After the error fixed, add numeric type.
pandas_df = scalars_pandas_df_index.reset_index(drop=False)[
[
"rowindex",
"rowindex_2",
"float64_col",
"int64_col",
"int64_too",
]
* 30
]

df = maybe_ordered_session.read_pandas(pandas_df)
return (df, pandas_df)


@pytest.fixture(scope="session")
def hockey_df(
hockey_table_id: str, session: bigframes.Session
Expand Down
42 changes: 42 additions & 0 deletions tests/system/large/test_dataframe.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import sys

import pandas as pd
import pytest


@pytest.mark.skipif(
sys.version_info >= (3, 12),
# See: https://github.com/python/cpython/issues/112282
reason="setrecursionlimit has no effect on the Python C stack since Python 3.12.",
)
def test_corr_w_numeric_only(scalars_df_numeric_150_columns_maybe_ordered):
scalars_df, scalars_pandas_df = scalars_df_numeric_150_columns_maybe_ordered
bf_result = scalars_df.corr(numeric_only=True).to_pandas()
pd_result = scalars_pandas_df.corr(numeric_only=True)

pd.testing.assert_frame_equal(
bf_result,
pd_result,
check_dtype=False,
check_index_type=False,
check_column_type=False,
)


@pytest.mark.skipif(
sys.version_info >= (3, 12),
# See: https://github.com/python/cpython/issues/112282
reason="setrecursionlimit has no effect on the Python C stack since Python 3.12.",
)
def test_cov_w_numeric_only(scalars_df_numeric_150_columns_maybe_ordered):
scalars_df, scalars_pandas_df = scalars_df_numeric_150_columns_maybe_ordered
bf_result = scalars_df.cov(numeric_only=True).to_pandas()
pd_result = scalars_pandas_df.cov(numeric_only=True)

pd.testing.assert_frame_equal(
bf_result,
pd_result,
check_dtype=False,
check_index_type=False,
check_column_type=False,
)
9 changes: 4 additions & 5 deletions tests/system/small/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -2075,8 +2075,8 @@ def test_combine_first(
),
],
)
def test_corr_w_numeric_only(scalars_dfs, columns, numeric_only):
scalars_df, scalars_pandas_df = scalars_dfs
def test_corr_w_numeric_only(scalars_dfs_maybe_ordered, columns, numeric_only):
scalars_df, scalars_pandas_df = scalars_dfs_maybe_ordered

bf_result = scalars_df[columns].corr(numeric_only=numeric_only).to_pandas()
pd_result = scalars_pandas_df[columns].corr(numeric_only=numeric_only)
Expand Down Expand Up @@ -2115,11 +2115,10 @@ def test_corr_w_invalid_parameters(scalars_dfs):
),
],
)
def test_cov_w_numeric_only(scalars_dfs, columns, numeric_only):
scalars_df, scalars_pandas_df = scalars_dfs
def test_cov_w_numeric_only(scalars_dfs_maybe_ordered, columns, numeric_only):
scalars_df, scalars_pandas_df = scalars_dfs_maybe_ordered
bf_result = scalars_df[columns].cov(numeric_only=numeric_only).to_pandas()
pd_result = scalars_pandas_df[columns].cov(numeric_only=numeric_only)

# BigFrames and Pandas differ in their data type handling:
# - Column types: BigFrames uses Float64, Pandas uses float64.
# - Index types: BigFrames uses strign, Pandas uses object.
Expand Down
8 changes: 6 additions & 2 deletions tests/system/small/test_multiindex.py
Original file line number Diff line number Diff line change
Expand Up @@ -910,7 +910,9 @@ def test_column_multi_index_unstack(scalars_df_index, scalars_pandas_df_index):

def test_corr_w_multi_index(scalars_df_index, scalars_pandas_df_index):
columns = ["int64_too", "float64_col", "int64_col"]
multi_columns = pandas.MultiIndex.from_tuples(zip(["a", "b", "b"], [1, 2, 2]))
multi_columns = pandas.MultiIndex.from_tuples(
zip(["a", "b", "b"], [1, 2, 2]), names=[None, "level_2"]
)

bf = scalars_df_index[columns].copy()
bf.columns = multi_columns
Expand All @@ -931,7 +933,9 @@ def test_corr_w_multi_index(scalars_df_index, scalars_pandas_df_index):

def test_cov_w_multi_index(scalars_df_index, scalars_pandas_df_index):
columns = ["int64_too", "float64_col", "int64_col"]
multi_columns = pandas.MultiIndex.from_tuples(zip(["a", "b", "b"], [1, 2, 2]))
multi_columns = pandas.MultiIndex.from_tuples(
zip(["a", "b", "b"], [1, 2, 2]), names=["level_1", None]
)

bf = scalars_df_index[columns].copy()
bf.columns = multi_columns
Expand Down