diff --git a/bigframes/core/__init__.py b/bigframes/core/__init__.py index ea169dbb74..4dc2e4d7af 100644 --- a/bigframes/core/__init__.py +++ b/bigframes/core/__init__.py @@ -261,28 +261,6 @@ def aggregate( ) ) - def corr_aggregate( - self, corr_aggregations: typing.Sequence[typing.Tuple[str, str, str]] - ) -> ArrayValue: - """ - Get correlations between each lef_column_id and right_column_id, stored in the respective output_column_id. - This uses BigQuery's CORR under the hood, and thus only Pearson's method is used. - Arguments: - corr_aggregations: left_column_id, right_column_id, output_column_id tuples - """ - aggregations = tuple( - ( - ex.BinaryAggregation( - agg_ops.CorrOp(), ex.free_var(agg[0]), ex.free_var(agg[1]) - ), - agg[2], - ) - for agg in corr_aggregations - ) - return ArrayValue( - nodes.AggregateNode(child=self.node, aggregations=aggregations) - ) - def project_window_op( self, column_name: str, diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index e758e20335..34df7231cc 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -1040,26 +1040,29 @@ def get_stat(self, column_id: str, stat: agg_ops.UnaryAggregateOp): self._stats_cache[column_id].update(stats_map) return stats_map[stat.name] - def get_corr_stat(self, column_id_left: str, column_id_right: str): + def get_binary_stat( + self, column_id_left: str, column_id_right: str, stat: agg_ops.BinaryAggregateOp + ): # TODO(kemppeterson): Clean up the column names for DataFrames.corr support # TODO(kemppeterson): Add a cache here. - corr_aggregations = [ + aggregations = [ ( - column_id_left, - column_id_right, - "corr_" + column_id_left + column_id_right, + ex.BinaryAggregation( + stat, ex.free_var(column_id_left), ex.free_var(column_id_right) + ), + f"{stat.name}_{column_id_left}{column_id_right}", ) ] - expr = self.expr.corr_aggregate(corr_aggregations) + expr = self.expr.aggregate(aggregations) offset_index_id = guid.generate_guid() expr = expr.promote_offsets(offset_index_id) block = Block( expr, index_columns=[offset_index_id], - column_labels=[a[2] for a in corr_aggregations], + column_labels=[a[1] for a in aggregations], ) df, _ = block.to_pandas() - return df.loc[0, "corr_" + column_id_left + column_id_right] + return df.loc[0, f"{stat.name}_{column_id_left}{column_id_right}"] def summarize( self, diff --git a/bigframes/core/compile/aggregate_compiler.py b/bigframes/core/compile/aggregate_compiler.py index 0dbc0e7310..1dad128599 100644 --- a/bigframes/core/compile/aggregate_compiler.py +++ b/bigframes/core/compile/aggregate_compiler.py @@ -431,6 +431,19 @@ def _( return cast(ibis_types.NumericColumn, bq_corr) +@compile_binary_agg.register +def _( + op: agg_ops.CovOp, left: ibis_types.Column, right: ibis_types.Column, window=None +) -> ibis_types.NumericValue: + # Will be null if all inputs are null. Pandas defaults to zero sum though. + left_numeric = cast(ibis_types.NumericColumn, left) + right_numeric = cast(ibis_types.NumericColumn, right) + bq_cov = _apply_window_if_present( + left_numeric.cov(right_numeric, how="sample"), window + ) + return cast(ibis_types.NumericColumn, bq_cov) + + def _apply_window_if_present(value: ibis_types.Value, window): return value.over(window) if (window is not None) else value diff --git a/bigframes/operations/aggregations.py b/bigframes/operations/aggregations.py index aed05e287b..6301ece865 100644 --- a/bigframes/operations/aggregations.py +++ b/bigframes/operations/aggregations.py @@ -265,6 +265,11 @@ class CorrOp(BinaryAggregateOp): name: ClassVar[str] = "corr" +@dataclasses.dataclass(frozen=True) +class CovOp(BinaryAggregateOp): + name: ClassVar[str] = "cov" + + sum_op = SumOp() mean_op = MeanOp() median_op = MedianOp() diff --git a/bigframes/operations/base.py b/bigframes/operations/base.py index 4aad9479e7..04114b43cb 100644 --- a/bigframes/operations/base.py +++ b/bigframes/operations/base.py @@ -25,6 +25,7 @@ import bigframes.core.scalar as scalars import bigframes.dtypes import bigframes.operations as ops +import bigframes.operations.aggregations as agg_ops import bigframes.series as series import bigframes.session import third_party.bigframes_vendored.pandas.pandas._typing as vendored_pandas_typing @@ -188,10 +189,12 @@ def _apply_binary_op( block, result_id = self._block.project_expr(expr, name) return series.Series(block.select_column(result_id)) - def _apply_corr_aggregation(self, other: series.Series) -> float: + def _apply_binary_aggregation( + self, other: series.Series, stat: agg_ops.BinaryAggregateOp + ) -> float: (left, right, block) = self._align(other, how="outer") - return block.get_corr_stat(left, right) + return block.get_binary_stat(left, right, stat) def _align(self, other: series.Series, how="outer") -> tuple[str, str, blocks.Block]: # type: ignore """Aligns the series value with another scalar or series object. Returns new left column id, right column id and joined tabled expression.""" diff --git a/bigframes/series.py b/bigframes/series.py index 6167ce0966..753e195e0a 100644 --- a/bigframes/series.py +++ b/bigframes/series.py @@ -734,8 +734,8 @@ def round(self, decimals=0) -> "Series": return self._apply_binary_op(decimals, ops.round_op) def corr(self, other: Series, method="pearson", min_periods=None) -> float: - # TODO(kemppeterson): Validate early that both are numeric - # TODO(kemppeterson): Handle partially-numeric columns + # TODO(tbergeron): Validate early that both are numeric + # TODO(tbergeron): Handle partially-numeric columns if method != "pearson": raise NotImplementedError( f"Only Pearson correlation is currently supported. {constants.FEEDBACK_LINK}" @@ -744,7 +744,10 @@ def corr(self, other: Series, method="pearson", min_periods=None) -> float: raise NotImplementedError( f"min_periods not yet supported. {constants.FEEDBACK_LINK}" ) - return self._apply_corr_aggregation(other) + return self._apply_binary_aggregation(other, agg_ops.CorrOp()) + + def cov(self, other: Series) -> float: + return self._apply_binary_aggregation(other, agg_ops.CovOp()) def all(self) -> bool: return typing.cast(bool, self._apply_aggregation(agg_ops.all_op)) diff --git a/tests/system/small/test_series.py b/tests/system/small/test_series.py index 5d8fb0b29c..f2790d190a 100644 --- a/tests/system/small/test_series.py +++ b/tests/system/small/test_series.py @@ -656,7 +656,7 @@ def test_mods(scalars_dfs, col_x, col_y, method): # We work around a pandas bug that doesn't handle correlating nullable dtypes by doing this # manually with dumb self-correlation instead of parameterized as test_mods is above. -def test_corr(scalars_dfs): +def test_series_corr(scalars_dfs): scalars_df, scalars_pandas_df = scalars_dfs bf_result = scalars_df["int64_too"].corr(scalars_df["int64_too"]) pd_result = ( @@ -667,6 +667,17 @@ def test_corr(scalars_dfs): assert math.isclose(pd_result, bf_result) +def test_series_cov(scalars_dfs): + scalars_df, scalars_pandas_df = scalars_dfs + bf_result = scalars_df["int64_too"].cov(scalars_df["int64_too"]) + pd_result = ( + scalars_pandas_df["int64_too"] + .astype("int64") + .cov(scalars_pandas_df["int64_too"].astype("int64")) + ) + assert math.isclose(pd_result, bf_result) + + @pytest.mark.parametrize( ("col_x",), [ diff --git a/tests/unit/test_core.py b/tests/unit/test_core.py index a7f45efc85..42cbcbbc9f 100644 --- a/tests/unit/test_core.py +++ b/tests/unit/test_core.py @@ -208,23 +208,3 @@ def test_arrayvalue_to_ibis_expr_with_aggregate(): assert actual.columns[0] == "col1" assert actual.columns[1] == "col4" assert expr.columns[1].type().is_int64() - - -def test_arrayvalue_to_ibis_expr_with_corr_aggregate(): - value = resources.create_arrayvalue( - pandas.DataFrame( - { - "col1": [1, 2, 3], - "col2": ["a", "b", "c"], - "col3": [0.1, 0.2, 0.3], - } - ), - total_ordering_columns=["col1"], - ) - expr = value.corr_aggregate( - corr_aggregations=[("col1", "col3", "col4")] - )._compile_ordered() - actual = expr._to_ibis_expr(ordering_mode="unordered") - assert len(expr.columns) == 1 - assert actual.columns[0] == "col4" - assert expr.columns[0].type().is_float64() diff --git a/third_party/bigframes_vendored/pandas/core/series.py b/third_party/bigframes_vendored/pandas/core/series.py index 9e376c713e..4232d3ec2a 100644 --- a/third_party/bigframes_vendored/pandas/core/series.py +++ b/third_party/bigframes_vendored/pandas/core/series.py @@ -842,6 +842,27 @@ def corr(self, other, method="pearson", min_periods=None) -> float: """ raise NotImplementedError("abstract method") + def cov( + self, + other, + ) -> float: + """ + Compute covariance with Series, excluding missing values. + + The two `Series` objects are not required to be the same length and + will be aligned internally before the covariance is calculated. + + Args: + other (Series): + Series with which to compute the covariance. + + Returns: + float: + Covariance between Series and other normalized by N-1 + (unbiased estimator). + """ + raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) + def diff(self) -> Series: """ First discrete difference of element.