Skip to content

Commit

Permalink
feat: add Series.cov method (#368)
Browse files Browse the repository at this point in the history
Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly:
- [ ] Make sure to open an issue as a [bug/issue](https://togithub.com/googleapis/python-bigquery-dataframes/issues/new/choose) before writing your code!  That way we can discuss the change, evaluate designs, and agree on the general idea
- [ ] Ensure the tests and linter pass
- [ ] Code coverage does not decrease (if any source code was changed)
- [ ] Appropriate docs were updated (if necessary)

Fixes #<issue_number_goes_here> 🦕
  • Loading branch information
TrevorBergeron authored Feb 7, 2024
1 parent 91596b8 commit 443db22
Show file tree
Hide file tree
Showing 9 changed files with 73 additions and 56 deletions.
22 changes: 0 additions & 22 deletions bigframes/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,28 +261,6 @@ def aggregate(
)
)

def corr_aggregate(
self, corr_aggregations: typing.Sequence[typing.Tuple[str, str, str]]
) -> ArrayValue:
"""
Get correlations between each lef_column_id and right_column_id, stored in the respective output_column_id.
This uses BigQuery's CORR under the hood, and thus only Pearson's method is used.
Arguments:
corr_aggregations: left_column_id, right_column_id, output_column_id tuples
"""
aggregations = tuple(
(
ex.BinaryAggregation(
agg_ops.CorrOp(), ex.free_var(agg[0]), ex.free_var(agg[1])
),
agg[2],
)
for agg in corr_aggregations
)
return ArrayValue(
nodes.AggregateNode(child=self.node, aggregations=aggregations)
)

def project_window_op(
self,
column_name: str,
Expand Down
19 changes: 11 additions & 8 deletions bigframes/core/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -1040,26 +1040,29 @@ def get_stat(self, column_id: str, stat: agg_ops.UnaryAggregateOp):
self._stats_cache[column_id].update(stats_map)
return stats_map[stat.name]

def get_corr_stat(self, column_id_left: str, column_id_right: str):
def get_binary_stat(
self, column_id_left: str, column_id_right: str, stat: agg_ops.BinaryAggregateOp
):
# TODO(kemppeterson): Clean up the column names for DataFrames.corr support
# TODO(kemppeterson): Add a cache here.
corr_aggregations = [
aggregations = [
(
column_id_left,
column_id_right,
"corr_" + column_id_left + column_id_right,
ex.BinaryAggregation(
stat, ex.free_var(column_id_left), ex.free_var(column_id_right)
),
f"{stat.name}_{column_id_left}{column_id_right}",
)
]
expr = self.expr.corr_aggregate(corr_aggregations)
expr = self.expr.aggregate(aggregations)
offset_index_id = guid.generate_guid()
expr = expr.promote_offsets(offset_index_id)
block = Block(
expr,
index_columns=[offset_index_id],
column_labels=[a[2] for a in corr_aggregations],
column_labels=[a[1] for a in aggregations],
)
df, _ = block.to_pandas()
return df.loc[0, "corr_" + column_id_left + column_id_right]
return df.loc[0, f"{stat.name}_{column_id_left}{column_id_right}"]

def summarize(
self,
Expand Down
13 changes: 13 additions & 0 deletions bigframes/core/compile/aggregate_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,19 @@ def _(
return cast(ibis_types.NumericColumn, bq_corr)


@compile_binary_agg.register
def _(
op: agg_ops.CovOp, left: ibis_types.Column, right: ibis_types.Column, window=None
) -> ibis_types.NumericValue:
# Will be null if all inputs are null. Pandas defaults to zero sum though.
left_numeric = cast(ibis_types.NumericColumn, left)
right_numeric = cast(ibis_types.NumericColumn, right)
bq_cov = _apply_window_if_present(
left_numeric.cov(right_numeric, how="sample"), window
)
return cast(ibis_types.NumericColumn, bq_cov)


def _apply_window_if_present(value: ibis_types.Value, window):
return value.over(window) if (window is not None) else value

Expand Down
5 changes: 5 additions & 0 deletions bigframes/operations/aggregations.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,11 @@ class CorrOp(BinaryAggregateOp):
name: ClassVar[str] = "corr"


@dataclasses.dataclass(frozen=True)
class CovOp(BinaryAggregateOp):
name: ClassVar[str] = "cov"


sum_op = SumOp()
mean_op = MeanOp()
median_op = MedianOp()
Expand Down
7 changes: 5 additions & 2 deletions bigframes/operations/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import bigframes.core.scalar as scalars
import bigframes.dtypes
import bigframes.operations as ops
import bigframes.operations.aggregations as agg_ops
import bigframes.series as series
import bigframes.session
import third_party.bigframes_vendored.pandas.pandas._typing as vendored_pandas_typing
Expand Down Expand Up @@ -188,10 +189,12 @@ def _apply_binary_op(
block, result_id = self._block.project_expr(expr, name)
return series.Series(block.select_column(result_id))

def _apply_corr_aggregation(self, other: series.Series) -> float:
def _apply_binary_aggregation(
self, other: series.Series, stat: agg_ops.BinaryAggregateOp
) -> float:
(left, right, block) = self._align(other, how="outer")

return block.get_corr_stat(left, right)
return block.get_binary_stat(left, right, stat)

def _align(self, other: series.Series, how="outer") -> tuple[str, str, blocks.Block]: # type: ignore
"""Aligns the series value with another scalar or series object. Returns new left column id, right column id and joined tabled expression."""
Expand Down
9 changes: 6 additions & 3 deletions bigframes/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -734,8 +734,8 @@ def round(self, decimals=0) -> "Series":
return self._apply_binary_op(decimals, ops.round_op)

def corr(self, other: Series, method="pearson", min_periods=None) -> float:
# TODO(kemppeterson): Validate early that both are numeric
# TODO(kemppeterson): Handle partially-numeric columns
# TODO(tbergeron): Validate early that both are numeric
# TODO(tbergeron): Handle partially-numeric columns
if method != "pearson":
raise NotImplementedError(
f"Only Pearson correlation is currently supported. {constants.FEEDBACK_LINK}"
Expand All @@ -744,7 +744,10 @@ def corr(self, other: Series, method="pearson", min_periods=None) -> float:
raise NotImplementedError(
f"min_periods not yet supported. {constants.FEEDBACK_LINK}"
)
return self._apply_corr_aggregation(other)
return self._apply_binary_aggregation(other, agg_ops.CorrOp())

def cov(self, other: Series) -> float:
return self._apply_binary_aggregation(other, agg_ops.CovOp())

def all(self) -> bool:
return typing.cast(bool, self._apply_aggregation(agg_ops.all_op))
Expand Down
13 changes: 12 additions & 1 deletion tests/system/small/test_series.py
Original file line number Diff line number Diff line change
Expand Up @@ -656,7 +656,7 @@ def test_mods(scalars_dfs, col_x, col_y, method):

# We work around a pandas bug that doesn't handle correlating nullable dtypes by doing this
# manually with dumb self-correlation instead of parameterized as test_mods is above.
def test_corr(scalars_dfs):
def test_series_corr(scalars_dfs):
scalars_df, scalars_pandas_df = scalars_dfs
bf_result = scalars_df["int64_too"].corr(scalars_df["int64_too"])
pd_result = (
Expand All @@ -667,6 +667,17 @@ def test_corr(scalars_dfs):
assert math.isclose(pd_result, bf_result)


def test_series_cov(scalars_dfs):
scalars_df, scalars_pandas_df = scalars_dfs
bf_result = scalars_df["int64_too"].cov(scalars_df["int64_too"])
pd_result = (
scalars_pandas_df["int64_too"]
.astype("int64")
.cov(scalars_pandas_df["int64_too"].astype("int64"))
)
assert math.isclose(pd_result, bf_result)


@pytest.mark.parametrize(
("col_x",),
[
Expand Down
20 changes: 0 additions & 20 deletions tests/unit/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,23 +208,3 @@ def test_arrayvalue_to_ibis_expr_with_aggregate():
assert actual.columns[0] == "col1"
assert actual.columns[1] == "col4"
assert expr.columns[1].type().is_int64()


def test_arrayvalue_to_ibis_expr_with_corr_aggregate():
value = resources.create_arrayvalue(
pandas.DataFrame(
{
"col1": [1, 2, 3],
"col2": ["a", "b", "c"],
"col3": [0.1, 0.2, 0.3],
}
),
total_ordering_columns=["col1"],
)
expr = value.corr_aggregate(
corr_aggregations=[("col1", "col3", "col4")]
)._compile_ordered()
actual = expr._to_ibis_expr(ordering_mode="unordered")
assert len(expr.columns) == 1
assert actual.columns[0] == "col4"
assert expr.columns[0].type().is_float64()
21 changes: 21 additions & 0 deletions third_party/bigframes_vendored/pandas/core/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -842,6 +842,27 @@ def corr(self, other, method="pearson", min_periods=None) -> float:
"""
raise NotImplementedError("abstract method")

def cov(
self,
other,
) -> float:
"""
Compute covariance with Series, excluding missing values.
The two `Series` objects are not required to be the same length and
will be aligned internally before the covariance is calculated.
Args:
other (Series):
Series with which to compute the covariance.
Returns:
float:
Covariance between Series and other normalized by N-1
(unbiased estimator).
"""
raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE)

def diff(self) -> Series:
"""
First discrete difference of element.
Expand Down

0 comments on commit 443db22

Please sign in to comment.