From 9dcf1aa918919704dcf4d12b05935b22fb502fc6 Mon Sep 17 00:00:00 2001 From: Huan Chen <142538604+Genesis929@users.noreply.github.com> Date: Mon, 9 Dec 2024 10:36:05 -0800 Subject: [PATCH] perf: update df.corr, df.cov to be used with more than 30 columns case. (#1161) * perf: update df.corr, df.cov to be used with more than 30 columns case. * add large test * remove print * fix_index * fix index * test fix * fix test * fix test * slightly improve multi_apply_unary_op to avoid RecursionError * update recursion limit for nox session * skip the test in e2e/python 3.12 * simplify code * simplify code --- bigframes/core/blocks.py | 49 +------ bigframes/dataframe.py | 192 +++++++++++++++++++++++++- tests/system/conftest.py | 22 +++ tests/system/large/test_dataframe.py | 42 ++++++ tests/system/small/test_dataframe.py | 9 +- tests/system/small/test_multiindex.py | 8 +- 6 files changed, 268 insertions(+), 54 deletions(-) create mode 100644 tests/system/large/test_dataframe.py diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index 574bed00eb..a9139dfee6 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -912,6 +912,8 @@ def multi_apply_unary_op( input_varname = input_varnames[0] block = self + + result_ids = [] for col_id in columns: label = self.col_id_to_label[col_id] block, result_id = block.project_expr( @@ -919,7 +921,8 @@ def multi_apply_unary_op( label=label, ) block = block.copy_values(result_id, col_id) - block = block.drop_columns([result_id]) + result_ids.append(result_id) + block = block.drop_columns(result_ids) # Special case, we can preserve transpose cache for full-frame unary ops if (self._transpose_cache is not None) and set(self.value_columns) == set( columns @@ -1317,50 +1320,6 @@ def summarize( index_columns=index_cols, ) - def calculate_pairwise_metric(self, op=agg_ops.CorrOp()): - """ - Returns a block object to compute pairwise metrics among all value columns in this block. - - The metric to be computed is specified by the `op` parameter, which can be either a - correlation operation (default) or a covariance operation. - """ - if len(self.value_columns) > 30: - raise NotImplementedError( - "This function supports dataframes with 30 columns or fewer. " - f"Provided dataframe has {len(self.value_columns)} columns. {constants.FEEDBACK_LINK}" - ) - - aggregations = [ - ( - ex.BinaryAggregation(op, ex.deref(left_col), ex.deref(right_col)), - f"{left_col}-{right_col}", - ) - for left_col in self.value_columns - for right_col in self.value_columns - ] - expr = self.expr.aggregate(aggregations) - - input_count = len(self.value_columns) - unpivot_columns = tuple( - tuple(expr.column_ids[input_count * i : input_count * (i + 1)]) - for i in range(input_count) - ) - labels = self._get_labels_for_columns(self.value_columns) - - # TODO(b/340896143): fix type error - expr, (index_col_ids, _, _) = unpivot( - expr, - row_labels=labels, - unpivot_columns=unpivot_columns, - ) - - return Block( - expr, - column_labels=self.column_labels, - index_columns=index_col_ids, - index_labels=self.column_labels.names, - ) - def explode( self, column_ids: typing.Sequence[str], diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 185a756fed..bedc13ecb3 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -1197,7 +1197,107 @@ def corr(self, method="pearson", min_periods=None, numeric_only=False) -> DataFr else: frame = self._drop_non_numeric() - return DataFrame(frame._block.calculate_pairwise_metric(op=agg_ops.CorrOp())) + orig_columns = frame.columns + # Replace column names with 0 to n - 1 to keep order + # and avoid the influence of duplicated column name + frame.columns = pandas.Index(range(len(orig_columns))) + frame = frame.astype(bigframes.dtypes.FLOAT_DTYPE) + block = frame._block + + # A new column that uniquely identifies each row + block, ordering_col = frame._block.promote_offsets(label="_bigframes_idx") + + val_col_ids = [ + col_id for col_id in block.value_columns if col_id != ordering_col + ] + + block = block.melt( + [ordering_col], val_col_ids, ["_bigframes_variable"], "_bigframes_value" + ) + + block = block.merge( + block, + left_join_ids=[ordering_col], + right_join_ids=[ordering_col], + how="inner", + sort=False, + ) + + frame = DataFrame(block).dropna( + subset=["_bigframes_value_x", "_bigframes_value_y"] + ) + + paired_mean_frame = ( + frame.groupby(["_bigframes_variable_x", "_bigframes_variable_y"]) + .agg( + _bigframes_paired_mean_x=bigframes.pandas.NamedAgg( + column="_bigframes_value_x", aggfunc="mean" + ), + _bigframes_paired_mean_y=bigframes.pandas.NamedAgg( + column="_bigframes_value_y", aggfunc="mean" + ), + ) + .reset_index() + ) + + frame = frame.merge( + paired_mean_frame, on=["_bigframes_variable_x", "_bigframes_variable_y"] + ) + frame["_bigframes_value_x"] -= frame["_bigframes_paired_mean_x"] + frame["_bigframes_value_y"] -= frame["_bigframes_paired_mean_y"] + + frame["_bigframes_dividend"] = ( + frame["_bigframes_value_x"] * frame["_bigframes_value_y"] + ) + frame["_bigframes_x_square"] = ( + frame["_bigframes_value_x"] * frame["_bigframes_value_x"] + ) + frame["_bigframes_y_square"] = ( + frame["_bigframes_value_y"] * frame["_bigframes_value_y"] + ) + + result = ( + frame.groupby(["_bigframes_variable_x", "_bigframes_variable_y"]) + .agg( + _bigframes_dividend_sum=bigframes.pandas.NamedAgg( + column="_bigframes_dividend", aggfunc="sum" + ), + _bigframes_x_square_sum=bigframes.pandas.NamedAgg( + column="_bigframes_x_square", aggfunc="sum" + ), + _bigframes_y_square_sum=bigframes.pandas.NamedAgg( + column="_bigframes_y_square", aggfunc="sum" + ), + ) + .reset_index() + ) + result["_bigframes_corr"] = result["_bigframes_dividend_sum"] / ( + ( + result["_bigframes_x_square_sum"] * result["_bigframes_y_square_sum"] + )._apply_unary_op(ops.sqrt_op) + ) + result = result._pivot( + index="_bigframes_variable_x", + columns="_bigframes_variable_y", + values="_bigframes_corr", + ) + + map_data = { + f"_bigframes_level_{i}": orig_columns.get_level_values(i) + for i in range(orig_columns.nlevels) + } + map_data["_bigframes_keys"] = range(len(orig_columns)) + map_df = bigframes.dataframe.DataFrame( + map_data, + session=self._get_block().expr.session, + ).set_index("_bigframes_keys") + result = result.join(map_df).sort_index() + index_columns = [f"_bigframes_level_{i}" for i in range(orig_columns.nlevels)] + result = result.set_index(index_columns) + result.index.names = orig_columns.names + result.columns = orig_columns + + return result def cov(self, *, numeric_only: bool = False) -> DataFrame: if not numeric_only: @@ -1205,7 +1305,95 @@ def cov(self, *, numeric_only: bool = False) -> DataFrame: else: frame = self._drop_non_numeric() - return DataFrame(frame._block.calculate_pairwise_metric(agg_ops.CovOp())) + orig_columns = frame.columns + # Replace column names with 0 to n - 1 to keep order + # and avoid the influence of duplicated column name + frame.columns = pandas.Index(range(len(orig_columns))) + frame = frame.astype(bigframes.dtypes.FLOAT_DTYPE) + block = frame._block + + # A new column that uniquely identifies each row + block, ordering_col = frame._block.promote_offsets(label="_bigframes_idx") + + val_col_ids = [ + col_id for col_id in block.value_columns if col_id != ordering_col + ] + + block = block.melt( + [ordering_col], val_col_ids, ["_bigframes_variable"], "_bigframes_value" + ) + block = block.merge( + block, + left_join_ids=[ordering_col], + right_join_ids=[ordering_col], + how="inner", + sort=False, + ) + + frame = DataFrame(block).dropna( + subset=["_bigframes_value_x", "_bigframes_value_y"] + ) + + paired_mean_frame = ( + frame.groupby(["_bigframes_variable_x", "_bigframes_variable_y"]) + .agg( + _bigframes_paired_mean_x=bigframes.pandas.NamedAgg( + column="_bigframes_value_x", aggfunc="mean" + ), + _bigframes_paired_mean_y=bigframes.pandas.NamedAgg( + column="_bigframes_value_y", aggfunc="mean" + ), + ) + .reset_index() + ) + + frame = frame.merge( + paired_mean_frame, on=["_bigframes_variable_x", "_bigframes_variable_y"] + ) + frame["_bigframes_value_x"] -= frame["_bigframes_paired_mean_x"] + frame["_bigframes_value_y"] -= frame["_bigframes_paired_mean_y"] + + frame["_bigframes_dividend"] = ( + frame["_bigframes_value_x"] * frame["_bigframes_value_y"] + ) + + result = ( + frame.groupby(["_bigframes_variable_x", "_bigframes_variable_y"]) + .agg( + _bigframes_dividend_sum=bigframes.pandas.NamedAgg( + column="_bigframes_dividend", aggfunc="sum" + ), + _bigframes_dividend_count=bigframes.pandas.NamedAgg( + column="_bigframes_dividend", aggfunc="count" + ), + ) + .reset_index() + ) + result["_bigframes_cov"] = result["_bigframes_dividend_sum"] / ( + result["_bigframes_dividend_count"] - 1 + ) + result = result._pivot( + index="_bigframes_variable_x", + columns="_bigframes_variable_y", + values="_bigframes_cov", + ) + + map_data = { + f"_bigframes_level_{i}": orig_columns.get_level_values(i) + for i in range(orig_columns.nlevels) + } + map_data["_bigframes_keys"] = range(len(orig_columns)) + map_df = bigframes.dataframe.DataFrame( + map_data, + session=self._get_block().expr.session, + ).set_index("_bigframes_keys") + result = result.join(map_df).sort_index() + index_columns = [f"_bigframes_level_{i}" for i in range(orig_columns.nlevels)] + result = result.set_index(index_columns) + result.index.names = orig_columns.names + result.columns = orig_columns + + return result def to_arrow( self, diff --git a/tests/system/conftest.py b/tests/system/conftest.py index fd6d7a80b0..e1cbf02780 100644 --- a/tests/system/conftest.py +++ b/tests/system/conftest.py @@ -611,6 +611,28 @@ def scalars_dfs_maybe_ordered( ) +@pytest.fixture(scope="session") +def scalars_df_numeric_150_columns_maybe_ordered( + maybe_ordered_session, + scalars_pandas_df_index, +): + """DataFrame pointing at test data.""" + # TODO(b/379911038): After the error fixed, add numeric type. + pandas_df = scalars_pandas_df_index.reset_index(drop=False)[ + [ + "rowindex", + "rowindex_2", + "float64_col", + "int64_col", + "int64_too", + ] + * 30 + ] + + df = maybe_ordered_session.read_pandas(pandas_df) + return (df, pandas_df) + + @pytest.fixture(scope="session") def hockey_df( hockey_table_id: str, session: bigframes.Session diff --git a/tests/system/large/test_dataframe.py b/tests/system/large/test_dataframe.py new file mode 100644 index 0000000000..20d383463a --- /dev/null +++ b/tests/system/large/test_dataframe.py @@ -0,0 +1,42 @@ +import sys + +import pandas as pd +import pytest + + +@pytest.mark.skipif( + sys.version_info >= (3, 12), + # See: https://github.com/python/cpython/issues/112282 + reason="setrecursionlimit has no effect on the Python C stack since Python 3.12.", +) +def test_corr_w_numeric_only(scalars_df_numeric_150_columns_maybe_ordered): + scalars_df, scalars_pandas_df = scalars_df_numeric_150_columns_maybe_ordered + bf_result = scalars_df.corr(numeric_only=True).to_pandas() + pd_result = scalars_pandas_df.corr(numeric_only=True) + + pd.testing.assert_frame_equal( + bf_result, + pd_result, + check_dtype=False, + check_index_type=False, + check_column_type=False, + ) + + +@pytest.mark.skipif( + sys.version_info >= (3, 12), + # See: https://github.com/python/cpython/issues/112282 + reason="setrecursionlimit has no effect on the Python C stack since Python 3.12.", +) +def test_cov_w_numeric_only(scalars_df_numeric_150_columns_maybe_ordered): + scalars_df, scalars_pandas_df = scalars_df_numeric_150_columns_maybe_ordered + bf_result = scalars_df.cov(numeric_only=True).to_pandas() + pd_result = scalars_pandas_df.cov(numeric_only=True) + + pd.testing.assert_frame_equal( + bf_result, + pd_result, + check_dtype=False, + check_index_type=False, + check_column_type=False, + ) diff --git a/tests/system/small/test_dataframe.py b/tests/system/small/test_dataframe.py index 12ca13eb80..eacec66b2f 100644 --- a/tests/system/small/test_dataframe.py +++ b/tests/system/small/test_dataframe.py @@ -2075,8 +2075,8 @@ def test_combine_first( ), ], ) -def test_corr_w_numeric_only(scalars_dfs, columns, numeric_only): - scalars_df, scalars_pandas_df = scalars_dfs +def test_corr_w_numeric_only(scalars_dfs_maybe_ordered, columns, numeric_only): + scalars_df, scalars_pandas_df = scalars_dfs_maybe_ordered bf_result = scalars_df[columns].corr(numeric_only=numeric_only).to_pandas() pd_result = scalars_pandas_df[columns].corr(numeric_only=numeric_only) @@ -2115,11 +2115,10 @@ def test_corr_w_invalid_parameters(scalars_dfs): ), ], ) -def test_cov_w_numeric_only(scalars_dfs, columns, numeric_only): - scalars_df, scalars_pandas_df = scalars_dfs +def test_cov_w_numeric_only(scalars_dfs_maybe_ordered, columns, numeric_only): + scalars_df, scalars_pandas_df = scalars_dfs_maybe_ordered bf_result = scalars_df[columns].cov(numeric_only=numeric_only).to_pandas() pd_result = scalars_pandas_df[columns].cov(numeric_only=numeric_only) - # BigFrames and Pandas differ in their data type handling: # - Column types: BigFrames uses Float64, Pandas uses float64. # - Index types: BigFrames uses strign, Pandas uses object. diff --git a/tests/system/small/test_multiindex.py b/tests/system/small/test_multiindex.py index cab74f617d..1c78ac63d9 100644 --- a/tests/system/small/test_multiindex.py +++ b/tests/system/small/test_multiindex.py @@ -910,7 +910,9 @@ def test_column_multi_index_unstack(scalars_df_index, scalars_pandas_df_index): def test_corr_w_multi_index(scalars_df_index, scalars_pandas_df_index): columns = ["int64_too", "float64_col", "int64_col"] - multi_columns = pandas.MultiIndex.from_tuples(zip(["a", "b", "b"], [1, 2, 2])) + multi_columns = pandas.MultiIndex.from_tuples( + zip(["a", "b", "b"], [1, 2, 2]), names=[None, "level_2"] + ) bf = scalars_df_index[columns].copy() bf.columns = multi_columns @@ -931,7 +933,9 @@ def test_corr_w_multi_index(scalars_df_index, scalars_pandas_df_index): def test_cov_w_multi_index(scalars_df_index, scalars_pandas_df_index): columns = ["int64_too", "float64_col", "int64_col"] - multi_columns = pandas.MultiIndex.from_tuples(zip(["a", "b", "b"], [1, 2, 2])) + multi_columns = pandas.MultiIndex.from_tuples( + zip(["a", "b", "b"], [1, 2, 2]), names=["level_1", None] + ) bf = scalars_df_index[columns].copy() bf.columns = multi_columns