Skip to content

Commit

Permalink
perf: update df.corr, df.cov to be used with more than 30 columns cas…
Browse files Browse the repository at this point in the history
…e. (#1161)

* perf: update df.corr, df.cov to be used with more than 30 columns case.

* add large test

* remove print

* fix_index

* fix index

* test fix

* fix test

* fix test

* slightly improve multi_apply_unary_op to avoid RecursionError

* update recursion limit for nox session

* skip the test in e2e/python 3.12

* simplify code

* simplify code
  • Loading branch information
Genesis929 authored Dec 9, 2024
1 parent 3072d38 commit 9dcf1aa
Show file tree
Hide file tree
Showing 6 changed files with 268 additions and 54 deletions.
49 changes: 4 additions & 45 deletions bigframes/core/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -912,14 +912,17 @@ def multi_apply_unary_op(
input_varname = input_varnames[0]

block = self

result_ids = []
for col_id in columns:
label = self.col_id_to_label[col_id]
block, result_id = block.project_expr(
expr.bind_variables({input_varname: ex.deref(col_id)}),
label=label,
)
block = block.copy_values(result_id, col_id)
block = block.drop_columns([result_id])
result_ids.append(result_id)
block = block.drop_columns(result_ids)
# Special case, we can preserve transpose cache for full-frame unary ops
if (self._transpose_cache is not None) and set(self.value_columns) == set(
columns
Expand Down Expand Up @@ -1317,50 +1320,6 @@ def summarize(
index_columns=index_cols,
)

def calculate_pairwise_metric(self, op=agg_ops.CorrOp()):
"""
Returns a block object to compute pairwise metrics among all value columns in this block.
The metric to be computed is specified by the `op` parameter, which can be either a
correlation operation (default) or a covariance operation.
"""
if len(self.value_columns) > 30:
raise NotImplementedError(
"This function supports dataframes with 30 columns or fewer. "
f"Provided dataframe has {len(self.value_columns)} columns. {constants.FEEDBACK_LINK}"
)

aggregations = [
(
ex.BinaryAggregation(op, ex.deref(left_col), ex.deref(right_col)),
f"{left_col}-{right_col}",
)
for left_col in self.value_columns
for right_col in self.value_columns
]
expr = self.expr.aggregate(aggregations)

input_count = len(self.value_columns)
unpivot_columns = tuple(
tuple(expr.column_ids[input_count * i : input_count * (i + 1)])
for i in range(input_count)
)
labels = self._get_labels_for_columns(self.value_columns)

# TODO(b/340896143): fix type error
expr, (index_col_ids, _, _) = unpivot(
expr,
row_labels=labels,
unpivot_columns=unpivot_columns,
)

return Block(
expr,
column_labels=self.column_labels,
index_columns=index_col_ids,
index_labels=self.column_labels.names,
)

def explode(
self,
column_ids: typing.Sequence[str],
Expand Down
192 changes: 190 additions & 2 deletions bigframes/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -1197,15 +1197,203 @@ def corr(self, method="pearson", min_periods=None, numeric_only=False) -> DataFr
else:
frame = self._drop_non_numeric()

return DataFrame(frame._block.calculate_pairwise_metric(op=agg_ops.CorrOp()))
orig_columns = frame.columns
# Replace column names with 0 to n - 1 to keep order
# and avoid the influence of duplicated column name
frame.columns = pandas.Index(range(len(orig_columns)))
frame = frame.astype(bigframes.dtypes.FLOAT_DTYPE)
block = frame._block

# A new column that uniquely identifies each row
block, ordering_col = frame._block.promote_offsets(label="_bigframes_idx")

val_col_ids = [
col_id for col_id in block.value_columns if col_id != ordering_col
]

block = block.melt(
[ordering_col], val_col_ids, ["_bigframes_variable"], "_bigframes_value"
)

block = block.merge(
block,
left_join_ids=[ordering_col],
right_join_ids=[ordering_col],
how="inner",
sort=False,
)

frame = DataFrame(block).dropna(
subset=["_bigframes_value_x", "_bigframes_value_y"]
)

paired_mean_frame = (
frame.groupby(["_bigframes_variable_x", "_bigframes_variable_y"])
.agg(
_bigframes_paired_mean_x=bigframes.pandas.NamedAgg(
column="_bigframes_value_x", aggfunc="mean"
),
_bigframes_paired_mean_y=bigframes.pandas.NamedAgg(
column="_bigframes_value_y", aggfunc="mean"
),
)
.reset_index()
)

frame = frame.merge(
paired_mean_frame, on=["_bigframes_variable_x", "_bigframes_variable_y"]
)
frame["_bigframes_value_x"] -= frame["_bigframes_paired_mean_x"]
frame["_bigframes_value_y"] -= frame["_bigframes_paired_mean_y"]

frame["_bigframes_dividend"] = (
frame["_bigframes_value_x"] * frame["_bigframes_value_y"]
)
frame["_bigframes_x_square"] = (
frame["_bigframes_value_x"] * frame["_bigframes_value_x"]
)
frame["_bigframes_y_square"] = (
frame["_bigframes_value_y"] * frame["_bigframes_value_y"]
)

result = (
frame.groupby(["_bigframes_variable_x", "_bigframes_variable_y"])
.agg(
_bigframes_dividend_sum=bigframes.pandas.NamedAgg(
column="_bigframes_dividend", aggfunc="sum"
),
_bigframes_x_square_sum=bigframes.pandas.NamedAgg(
column="_bigframes_x_square", aggfunc="sum"
),
_bigframes_y_square_sum=bigframes.pandas.NamedAgg(
column="_bigframes_y_square", aggfunc="sum"
),
)
.reset_index()
)
result["_bigframes_corr"] = result["_bigframes_dividend_sum"] / (
(
result["_bigframes_x_square_sum"] * result["_bigframes_y_square_sum"]
)._apply_unary_op(ops.sqrt_op)
)
result = result._pivot(
index="_bigframes_variable_x",
columns="_bigframes_variable_y",
values="_bigframes_corr",
)

map_data = {
f"_bigframes_level_{i}": orig_columns.get_level_values(i)
for i in range(orig_columns.nlevels)
}
map_data["_bigframes_keys"] = range(len(orig_columns))
map_df = bigframes.dataframe.DataFrame(
map_data,
session=self._get_block().expr.session,
).set_index("_bigframes_keys")
result = result.join(map_df).sort_index()
index_columns = [f"_bigframes_level_{i}" for i in range(orig_columns.nlevels)]
result = result.set_index(index_columns)
result.index.names = orig_columns.names
result.columns = orig_columns

return result

def cov(self, *, numeric_only: bool = False) -> DataFrame:
if not numeric_only:
frame = self._raise_on_non_numeric("corr")
else:
frame = self._drop_non_numeric()

return DataFrame(frame._block.calculate_pairwise_metric(agg_ops.CovOp()))
orig_columns = frame.columns
# Replace column names with 0 to n - 1 to keep order
# and avoid the influence of duplicated column name
frame.columns = pandas.Index(range(len(orig_columns)))
frame = frame.astype(bigframes.dtypes.FLOAT_DTYPE)
block = frame._block

# A new column that uniquely identifies each row
block, ordering_col = frame._block.promote_offsets(label="_bigframes_idx")

val_col_ids = [
col_id for col_id in block.value_columns if col_id != ordering_col
]

block = block.melt(
[ordering_col], val_col_ids, ["_bigframes_variable"], "_bigframes_value"
)
block = block.merge(
block,
left_join_ids=[ordering_col],
right_join_ids=[ordering_col],
how="inner",
sort=False,
)

frame = DataFrame(block).dropna(
subset=["_bigframes_value_x", "_bigframes_value_y"]
)

paired_mean_frame = (
frame.groupby(["_bigframes_variable_x", "_bigframes_variable_y"])
.agg(
_bigframes_paired_mean_x=bigframes.pandas.NamedAgg(
column="_bigframes_value_x", aggfunc="mean"
),
_bigframes_paired_mean_y=bigframes.pandas.NamedAgg(
column="_bigframes_value_y", aggfunc="mean"
),
)
.reset_index()
)

frame = frame.merge(
paired_mean_frame, on=["_bigframes_variable_x", "_bigframes_variable_y"]
)
frame["_bigframes_value_x"] -= frame["_bigframes_paired_mean_x"]
frame["_bigframes_value_y"] -= frame["_bigframes_paired_mean_y"]

frame["_bigframes_dividend"] = (
frame["_bigframes_value_x"] * frame["_bigframes_value_y"]
)

result = (
frame.groupby(["_bigframes_variable_x", "_bigframes_variable_y"])
.agg(
_bigframes_dividend_sum=bigframes.pandas.NamedAgg(
column="_bigframes_dividend", aggfunc="sum"
),
_bigframes_dividend_count=bigframes.pandas.NamedAgg(
column="_bigframes_dividend", aggfunc="count"
),
)
.reset_index()
)
result["_bigframes_cov"] = result["_bigframes_dividend_sum"] / (
result["_bigframes_dividend_count"] - 1
)
result = result._pivot(
index="_bigframes_variable_x",
columns="_bigframes_variable_y",
values="_bigframes_cov",
)

map_data = {
f"_bigframes_level_{i}": orig_columns.get_level_values(i)
for i in range(orig_columns.nlevels)
}
map_data["_bigframes_keys"] = range(len(orig_columns))
map_df = bigframes.dataframe.DataFrame(
map_data,
session=self._get_block().expr.session,
).set_index("_bigframes_keys")
result = result.join(map_df).sort_index()
index_columns = [f"_bigframes_level_{i}" for i in range(orig_columns.nlevels)]
result = result.set_index(index_columns)
result.index.names = orig_columns.names
result.columns = orig_columns

return result

def to_arrow(
self,
Expand Down
22 changes: 22 additions & 0 deletions tests/system/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -611,6 +611,28 @@ def scalars_dfs_maybe_ordered(
)


@pytest.fixture(scope="session")
def scalars_df_numeric_150_columns_maybe_ordered(
maybe_ordered_session,
scalars_pandas_df_index,
):
"""DataFrame pointing at test data."""
# TODO(b/379911038): After the error fixed, add numeric type.
pandas_df = scalars_pandas_df_index.reset_index(drop=False)[
[
"rowindex",
"rowindex_2",
"float64_col",
"int64_col",
"int64_too",
]
* 30
]

df = maybe_ordered_session.read_pandas(pandas_df)
return (df, pandas_df)


@pytest.fixture(scope="session")
def hockey_df(
hockey_table_id: str, session: bigframes.Session
Expand Down
42 changes: 42 additions & 0 deletions tests/system/large/test_dataframe.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import sys

import pandas as pd
import pytest


@pytest.mark.skipif(
sys.version_info >= (3, 12),
# See: https://github.com/python/cpython/issues/112282
reason="setrecursionlimit has no effect on the Python C stack since Python 3.12.",
)
def test_corr_w_numeric_only(scalars_df_numeric_150_columns_maybe_ordered):
scalars_df, scalars_pandas_df = scalars_df_numeric_150_columns_maybe_ordered
bf_result = scalars_df.corr(numeric_only=True).to_pandas()
pd_result = scalars_pandas_df.corr(numeric_only=True)

pd.testing.assert_frame_equal(
bf_result,
pd_result,
check_dtype=False,
check_index_type=False,
check_column_type=False,
)


@pytest.mark.skipif(
sys.version_info >= (3, 12),
# See: https://github.com/python/cpython/issues/112282
reason="setrecursionlimit has no effect on the Python C stack since Python 3.12.",
)
def test_cov_w_numeric_only(scalars_df_numeric_150_columns_maybe_ordered):
scalars_df, scalars_pandas_df = scalars_df_numeric_150_columns_maybe_ordered
bf_result = scalars_df.cov(numeric_only=True).to_pandas()
pd_result = scalars_pandas_df.cov(numeric_only=True)

pd.testing.assert_frame_equal(
bf_result,
pd_result,
check_dtype=False,
check_index_type=False,
check_column_type=False,
)
9 changes: 4 additions & 5 deletions tests/system/small/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -2075,8 +2075,8 @@ def test_combine_first(
),
],
)
def test_corr_w_numeric_only(scalars_dfs, columns, numeric_only):
scalars_df, scalars_pandas_df = scalars_dfs
def test_corr_w_numeric_only(scalars_dfs_maybe_ordered, columns, numeric_only):
scalars_df, scalars_pandas_df = scalars_dfs_maybe_ordered

bf_result = scalars_df[columns].corr(numeric_only=numeric_only).to_pandas()
pd_result = scalars_pandas_df[columns].corr(numeric_only=numeric_only)
Expand Down Expand Up @@ -2115,11 +2115,10 @@ def test_corr_w_invalid_parameters(scalars_dfs):
),
],
)
def test_cov_w_numeric_only(scalars_dfs, columns, numeric_only):
scalars_df, scalars_pandas_df = scalars_dfs
def test_cov_w_numeric_only(scalars_dfs_maybe_ordered, columns, numeric_only):
scalars_df, scalars_pandas_df = scalars_dfs_maybe_ordered
bf_result = scalars_df[columns].cov(numeric_only=numeric_only).to_pandas()
pd_result = scalars_pandas_df[columns].cov(numeric_only=numeric_only)

# BigFrames and Pandas differ in their data type handling:
# - Column types: BigFrames uses Float64, Pandas uses float64.
# - Index types: BigFrames uses strign, Pandas uses object.
Expand Down
8 changes: 6 additions & 2 deletions tests/system/small/test_multiindex.py
Original file line number Diff line number Diff line change
Expand Up @@ -910,7 +910,9 @@ def test_column_multi_index_unstack(scalars_df_index, scalars_pandas_df_index):

def test_corr_w_multi_index(scalars_df_index, scalars_pandas_df_index):
columns = ["int64_too", "float64_col", "int64_col"]
multi_columns = pandas.MultiIndex.from_tuples(zip(["a", "b", "b"], [1, 2, 2]))
multi_columns = pandas.MultiIndex.from_tuples(
zip(["a", "b", "b"], [1, 2, 2]), names=[None, "level_2"]
)

bf = scalars_df_index[columns].copy()
bf.columns = multi_columns
Expand All @@ -931,7 +933,9 @@ def test_corr_w_multi_index(scalars_df_index, scalars_pandas_df_index):

def test_cov_w_multi_index(scalars_df_index, scalars_pandas_df_index):
columns = ["int64_too", "float64_col", "int64_col"]
multi_columns = pandas.MultiIndex.from_tuples(zip(["a", "b", "b"], [1, 2, 2]))
multi_columns = pandas.MultiIndex.from_tuples(
zip(["a", "b", "b"], [1, 2, 2]), names=["level_1", None]
)

bf = scalars_df_index[columns].copy()
bf.columns = multi_columns
Expand Down

0 comments on commit 9dcf1aa

Please sign in to comment.