Skip to content

Commit

Permalink
fix: fix DataFrameGroupby.agg() issue with as_index=False (#273)
Browse files Browse the repository at this point in the history
Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly:
- [ ] Make sure to open an issue as a [bug/issue](https://togithub.com/googleapis/python-bigquery-dataframes/issues/new/choose) before writing your code!  That way we can discuss the change, evaluate designs, and agree on the general idea
- [ ] Ensure the tests and linter pass
- [ ] Code coverage does not decrease (if any source code was changed)
- [ ] Appropriate docs were updated (if necessary)

Fixes #271 🦕
  • Loading branch information
TrevorBergeron authored Dec 19, 2023
1 parent 5092215 commit ab49350
Show file tree
Hide file tree
Showing 7 changed files with 70 additions and 66 deletions.
1 change: 0 additions & 1 deletion bigframes/core/block_transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,6 @@ def value_counts(
by_column_ids=columns,
aggregations=[(dummy, agg_ops.count_op)],
dropna=dropna,
as_index=True,
)
count_id = agg_ids[0]
if normalize:
Expand Down
53 changes: 16 additions & 37 deletions bigframes/core/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
_MONOTONIC_DECREASING = "monotonic_decreasing"


LevelType = typing.Union[str, int]
LevelType = typing.Hashable
LevelsType = typing.Union[LevelType, typing.Sequence[LevelType]]


Expand Down Expand Up @@ -941,7 +941,6 @@ def aggregate(
by_column_ids: typing.Sequence[str] = (),
aggregations: typing.Sequence[typing.Tuple[str, agg_ops.AggregateOp]] = (),
*,
as_index: bool = True,
dropna: bool = True,
) -> typing.Tuple[Block, typing.Sequence[str]]:
"""
Expand All @@ -962,40 +961,21 @@ def aggregate(
aggregate_labels = self._get_labels_for_columns(
[agg[0] for agg in aggregations]
)
if as_index:
names: typing.List[Label] = []
for by_col_id in by_column_ids:
if by_col_id in self.value_columns:
names.append(self.col_id_to_label[by_col_id])
else:
names.append(self.col_id_to_index_name[by_col_id])
return (
Block(
result_expr,
index_columns=by_column_ids,
column_labels=aggregate_labels,
index_labels=names,
),
output_col_ids,
)
else: # as_index = False
# If as_index=False, drop grouping levels, but keep grouping value columns
by_value_columns = [
col for col in by_column_ids if col in self.value_columns
]
by_column_labels = self._get_labels_for_columns(by_value_columns)
labels = (*by_column_labels, *aggregate_labels)
offsets_id = guid.generate_guid()
result_expr_pruned = result_expr.select_columns(
[*by_value_columns, *output_col_ids]
).promote_offsets(offsets_id)

return (
Block(
result_expr_pruned, index_columns=[offsets_id], column_labels=labels
),
output_col_ids,
)
names: typing.List[Label] = []
for by_col_id in by_column_ids:
if by_col_id in self.value_columns:
names.append(self.col_id_to_label[by_col_id])
else:
names.append(self.col_id_to_index_name[by_col_id])
return (
Block(
result_expr,
index_columns=by_column_ids,
column_labels=aggregate_labels,
index_labels=names,
),
output_col_ids,
)

def get_stat(self, column_id: str, stat: agg_ops.AggregateOp):
"""Gets aggregates immediately, and caches it"""
Expand Down Expand Up @@ -1324,7 +1304,6 @@ def pivot(
result_block, _ = block.aggregate(
by_column_ids=self.index_columns,
aggregations=aggregations,
as_index=True,
dropna=True,
)

Expand Down
30 changes: 20 additions & 10 deletions bigframes/core/groupby/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,10 +263,10 @@ def _agg_string(self, func: str) -> df.DataFrame:
agg_block, _ = self._block.aggregate(
by_column_ids=self._by_col_ids,
aggregations=aggregations,
as_index=self._as_index,
dropna=self._dropna,
)
return df.DataFrame(agg_block)
dataframe = df.DataFrame(agg_block)
return dataframe if self._as_index else self._convert_index(dataframe)

def _agg_dict(self, func: typing.Mapping) -> df.DataFrame:
aggregations: typing.List[typing.Tuple[str, agg_ops.AggregateOp]] = []
Expand All @@ -285,7 +285,6 @@ def _agg_dict(self, func: typing.Mapping) -> df.DataFrame:
agg_block, _ = self._block.aggregate(
by_column_ids=self._by_col_ids,
aggregations=aggregations,
as_index=self._as_index,
dropna=self._dropna,
)
if want_aggfunc_level:
Expand All @@ -297,7 +296,8 @@ def _agg_dict(self, func: typing.Mapping) -> df.DataFrame:
)
else:
agg_block = agg_block.with_column_labels(pd.Index(column_labels))
return df.DataFrame(agg_block)
dataframe = df.DataFrame(agg_block)
return dataframe if self._as_index else self._convert_index(dataframe)

def _agg_list(self, func: typing.Sequence) -> df.DataFrame:
aggregations = [
Expand All @@ -311,15 +311,15 @@ def _agg_list(self, func: typing.Sequence) -> df.DataFrame:
agg_block, _ = self._block.aggregate(
by_column_ids=self._by_col_ids,
aggregations=aggregations,
as_index=self._as_index,
dropna=self._dropna,
)
agg_block = agg_block.with_column_labels(
pd.MultiIndex.from_tuples(
column_labels, names=[*self._block.column_labels.names, None]
)
)
return df.DataFrame(agg_block)
dataframe = df.DataFrame(agg_block)
return dataframe if self._as_index else self._convert_index(dataframe)

def _agg_named(self, **kwargs) -> df.DataFrame:
aggregations = []
Expand All @@ -339,11 +339,21 @@ def _agg_named(self, **kwargs) -> df.DataFrame:
agg_block, _ = self._block.aggregate(
by_column_ids=self._by_col_ids,
aggregations=aggregations,
as_index=self._as_index,
dropna=self._dropna,
)
agg_block = agg_block.with_column_labels(column_labels)
return df.DataFrame(agg_block)
dataframe = df.DataFrame(agg_block)
return dataframe if self._as_index else self._convert_index(dataframe)

def _convert_index(self, dataframe: df.DataFrame):
"""Convert index levels to columns except where names conflict."""
levels_to_drop = [
level for level in dataframe.index.names if level in dataframe.columns
]

if len(levels_to_drop) == dataframe.index.nlevels:
return dataframe.reset_index(drop=True)
return dataframe.droplevel(levels_to_drop).reset_index(drop=False)

aggregate = agg

Expand Down Expand Up @@ -379,10 +389,10 @@ def _aggregate_all(
result_block, _ = self._block.aggregate(
by_column_ids=self._by_col_ids,
aggregations=aggregations,
as_index=self._as_index,
dropna=self._dropna,
)
return df.DataFrame(result_block)
dataframe = df.DataFrame(result_block)
return dataframe if self._as_index else self._convert_index(dataframe)

def _apply_window_op(
self,
Expand Down
8 changes: 4 additions & 4 deletions bigframes/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@
# TODO(tbergeron): Convert to bytes-based limit
MAX_INLINE_DF_SIZE = 5000

LevelType = typing.Union[str, int]
LevelType = typing.Hashable
LevelsType = typing.Union[LevelType, typing.Sequence[LevelType]]
SingleItemValue = Union[bigframes.series.Series, int, float, Callable]

Expand Down Expand Up @@ -1956,7 +1956,7 @@ def _stack_mono(self):

def _stack_multi(self, level: LevelsType = -1):
n_levels = self.columns.nlevels
if isinstance(level, int) or isinstance(level, str):
if not utils.is_list_like(level):
level = [level]
level_indices = []
for level_ref in level:
Expand All @@ -1966,7 +1966,7 @@ def _stack_multi(self, level: LevelsType = -1):
else:
level_indices.append(level_ref)
else: # str
level_indices.append(self.columns.names.index(level_ref))
level_indices.append(self.columns.names.index(level_ref)) # type: ignore

new_order = [
*[i for i in range(n_levels) if i not in level_indices],
Expand All @@ -1982,7 +1982,7 @@ def _stack_multi(self, level: LevelsType = -1):
return DataFrame(block)

def unstack(self, level: LevelsType = -1):
if isinstance(level, int) or isinstance(level, str):
if not utils.is_list_like(level):
level = [level]

block = self._block
Expand Down
16 changes: 8 additions & 8 deletions bigframes/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -841,7 +841,6 @@ def mode(self) -> Series:
block, agg_ids = block.aggregate(
by_column_ids=[self._value_column],
aggregations=((self._value_column, agg_ops.count_op),),
as_index=False,
)
value_count_col_id = agg_ids[0]
block, max_value_count_col_id = block.apply_window_op(
Expand All @@ -855,14 +854,15 @@ def mode(self) -> Series:
ops.eq_op,
)
block = block.filter(is_mode_col_id)
mode_values_series = Series(
block.select_column(self._value_column).assign_label(
self._value_column, self.name
)
)
return typing.cast(
Series, mode_values_series.sort_values().reset_index(drop=True)
# use temporary name for reset_index to avoid collision, restore after dropping extra columns
block = (
block.with_index_labels(["mode_temp_internal"])
.order_by([OrderingColumnReference(self._value_column)])
.reset_index(drop=False)
)
block = block.select_column(self._value_column).with_column_labels([self.name])
mode_values_series = Series(block.select_column(self._value_column))
return typing.cast(Series, mode_values_series)

def mean(self) -> float:
return typing.cast(float, self._apply_aggregation(agg_ops.mean_op))
Expand Down
17 changes: 13 additions & 4 deletions tests/system/small/test_groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,23 +122,32 @@ def test_dataframe_groupby_agg_list(scalars_df_index, scalars_pandas_df_index):
pd.testing.assert_frame_equal(pd_result, bf_result_computed, check_dtype=False)


@pytest.mark.parametrize(
("as_index"),
[
(True),
(False),
],
)
def test_dataframe_groupby_agg_dict_with_list(
scalars_df_index, scalars_pandas_df_index
scalars_df_index, scalars_pandas_df_index, as_index
):
col_names = ["int64_too", "float64_col", "int64_col", "bool_col", "string_col"]
bf_result = (
scalars_df_index[col_names]
.groupby("string_col")
.groupby("string_col", as_index=as_index)
.agg({"int64_too": ["mean", "max"], "string_col": "count"})
)
pd_result = (
scalars_pandas_df_index[col_names]
.groupby("string_col")
.groupby("string_col", as_index=as_index)
.agg({"int64_too": ["mean", "max"], "string_col": "count"})
)
bf_result_computed = bf_result.to_pandas()

pd.testing.assert_frame_equal(pd_result, bf_result_computed, check_dtype=False)
pd.testing.assert_frame_equal(
pd_result, bf_result_computed, check_dtype=False, check_index_type=False
)


def test_dataframe_groupby_agg_dict_no_lists(scalars_df_index, scalars_pandas_df_index):
Expand Down
11 changes: 9 additions & 2 deletions tests/system/small/test_multiindex.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,17 +356,24 @@ def test_multi_index_dataframe_groupby(scalars_df_index, scalars_pandas_df_index
def test_multi_index_dataframe_groupby_level_aggregate(
scalars_df_index, scalars_pandas_df_index, level, as_index
):
index_cols = ["int64_too", "bool_col"]
bf_result = (
scalars_df_index.set_index(["int64_too", "bool_col"])
scalars_df_index.set_index(index_cols)
.groupby(level=level, as_index=as_index)
.mean(numeric_only=True)
.to_pandas()
)
pd_result = (
scalars_pandas_df_index.set_index(["int64_too", "bool_col"])
scalars_pandas_df_index.set_index(index_cols)
.groupby(level=level, as_index=as_index)
.mean(numeric_only=True)
)
# For as_index=False, pandas will drop index levels used as groupings
# In the future, it will include this in the result, bigframes already does this behavior
if not as_index:
for col in index_cols:
if col in bf_result.columns:
bf_result = bf_result.drop(col, axis=1)

# Pandas will have int64 index, while bigquery will have Int64 when resetting
pandas.testing.assert_frame_equal(bf_result, pd_result, check_index_type=False)
Expand Down

0 comments on commit ab49350

Please sign in to comment.