Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: fix DataFrameGroupby.agg() issue with as_index=False #273

Merged
merged 11 commits into from
Dec 19, 2023
1 change: 0 additions & 1 deletion bigframes/core/block_transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,6 @@ def value_counts(
by_column_ids=columns,
aggregations=[(dummy, agg_ops.count_op)],
dropna=dropna,
as_index=True,
)
count_id = agg_ids[0]
if normalize:
Expand Down
53 changes: 16 additions & 37 deletions bigframes/core/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
_MONOTONIC_DECREASING = "monotonic_decreasing"


LevelType = typing.Union[str, int]
LevelType = typing.Hashable
LevelsType = typing.Union[LevelType, typing.Sequence[LevelType]]


Expand Down Expand Up @@ -941,7 +941,6 @@ def aggregate(
by_column_ids: typing.Sequence[str] = (),
aggregations: typing.Sequence[typing.Tuple[str, agg_ops.AggregateOp]] = (),
*,
as_index: bool = True,
dropna: bool = True,
) -> typing.Tuple[Block, typing.Sequence[str]]:
"""
Expand All @@ -962,40 +961,21 @@ def aggregate(
aggregate_labels = self._get_labels_for_columns(
[agg[0] for agg in aggregations]
)
if as_index:
names: typing.List[Label] = []
for by_col_id in by_column_ids:
if by_col_id in self.value_columns:
names.append(self.col_id_to_label[by_col_id])
else:
names.append(self.col_id_to_index_name[by_col_id])
return (
Block(
result_expr,
index_columns=by_column_ids,
column_labels=aggregate_labels,
index_labels=names,
),
output_col_ids,
)
else: # as_index = False
# If as_index=False, drop grouping levels, but keep grouping value columns
by_value_columns = [
col for col in by_column_ids if col in self.value_columns
]
by_column_labels = self._get_labels_for_columns(by_value_columns)
labels = (*by_column_labels, *aggregate_labels)
offsets_id = guid.generate_guid()
result_expr_pruned = result_expr.select_columns(
[*by_value_columns, *output_col_ids]
).promote_offsets(offsets_id)

return (
Block(
result_expr_pruned, index_columns=[offsets_id], column_labels=labels
),
output_col_ids,
)
names: typing.List[Label] = []
for by_col_id in by_column_ids:
if by_col_id in self.value_columns:
names.append(self.col_id_to_label[by_col_id])
else:
names.append(self.col_id_to_index_name[by_col_id])
return (
Block(
result_expr,
index_columns=by_column_ids,
column_labels=aggregate_labels,
index_labels=names,
),
output_col_ids,
)

def get_stat(self, column_id: str, stat: agg_ops.AggregateOp):
"""Gets aggregates immediately, and caches it"""
Expand Down Expand Up @@ -1324,7 +1304,6 @@ def pivot(
result_block, _ = block.aggregate(
by_column_ids=self.index_columns,
aggregations=aggregations,
as_index=True,
dropna=True,
)

Expand Down
30 changes: 20 additions & 10 deletions bigframes/core/groupby/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,10 +263,10 @@ def _agg_string(self, func: str) -> df.DataFrame:
agg_block, _ = self._block.aggregate(
by_column_ids=self._by_col_ids,
aggregations=aggregations,
as_index=self._as_index,
dropna=self._dropna,
)
return df.DataFrame(agg_block)
dataframe = df.DataFrame(agg_block)
return dataframe if self._as_index else self._convert_index(dataframe)

def _agg_dict(self, func: typing.Mapping) -> df.DataFrame:
aggregations: typing.List[typing.Tuple[str, agg_ops.AggregateOp]] = []
Expand All @@ -285,7 +285,6 @@ def _agg_dict(self, func: typing.Mapping) -> df.DataFrame:
agg_block, _ = self._block.aggregate(
by_column_ids=self._by_col_ids,
aggregations=aggregations,
as_index=self._as_index,
dropna=self._dropna,
)
if want_aggfunc_level:
Expand All @@ -297,7 +296,8 @@ def _agg_dict(self, func: typing.Mapping) -> df.DataFrame:
)
else:
agg_block = agg_block.with_column_labels(pd.Index(column_labels))
return df.DataFrame(agg_block)
dataframe = df.DataFrame(agg_block)
return dataframe if self._as_index else self._convert_index(dataframe)

def _agg_list(self, func: typing.Sequence) -> df.DataFrame:
aggregations = [
Expand All @@ -311,15 +311,15 @@ def _agg_list(self, func: typing.Sequence) -> df.DataFrame:
agg_block, _ = self._block.aggregate(
by_column_ids=self._by_col_ids,
aggregations=aggregations,
as_index=self._as_index,
dropna=self._dropna,
)
agg_block = agg_block.with_column_labels(
pd.MultiIndex.from_tuples(
column_labels, names=[*self._block.column_labels.names, None]
)
)
return df.DataFrame(agg_block)
dataframe = df.DataFrame(agg_block)
return dataframe if self._as_index else self._convert_index(dataframe)

def _agg_named(self, **kwargs) -> df.DataFrame:
aggregations = []
Expand All @@ -339,11 +339,21 @@ def _agg_named(self, **kwargs) -> df.DataFrame:
agg_block, _ = self._block.aggregate(
by_column_ids=self._by_col_ids,
aggregations=aggregations,
as_index=self._as_index,
dropna=self._dropna,
)
agg_block = agg_block.with_column_labels(column_labels)
return df.DataFrame(agg_block)
dataframe = df.DataFrame(agg_block)
return dataframe if self._as_index else self._convert_index(dataframe)

def _convert_index(self, dataframe: df.DataFrame):
"""Convert index levels to columns except where names conflict."""
levels_to_drop = [
level for level in dataframe.index.names if level in dataframe.columns
]

if len(levels_to_drop) == dataframe.index.nlevels:
return dataframe.reset_index(drop=True)
return dataframe.droplevel(levels_to_drop).reset_index(drop=False)

aggregate = agg

Expand Down Expand Up @@ -379,10 +389,10 @@ def _aggregate_all(
result_block, _ = self._block.aggregate(
by_column_ids=self._by_col_ids,
aggregations=aggregations,
as_index=self._as_index,
dropna=self._dropna,
)
return df.DataFrame(result_block)
dataframe = df.DataFrame(result_block)
return dataframe if self._as_index else self._convert_index(dataframe)

def _apply_window_op(
self,
Expand Down
8 changes: 4 additions & 4 deletions bigframes/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@
# TODO(tbergeron): Convert to bytes-based limit
MAX_INLINE_DF_SIZE = 5000

LevelType = typing.Union[str, int]
LevelType = typing.Hashable
LevelsType = typing.Union[LevelType, typing.Sequence[LevelType]]
SingleItemValue = Union[bigframes.series.Series, int, float, Callable]

Expand Down Expand Up @@ -1956,7 +1956,7 @@ def _stack_mono(self):

def _stack_multi(self, level: LevelsType = -1):
n_levels = self.columns.nlevels
if isinstance(level, int) or isinstance(level, str):
if not utils.is_list_like(level):
level = [level]
level_indices = []
for level_ref in level:
Expand All @@ -1966,7 +1966,7 @@ def _stack_multi(self, level: LevelsType = -1):
else:
level_indices.append(level_ref)
else: # str
level_indices.append(self.columns.names.index(level_ref))
level_indices.append(self.columns.names.index(level_ref)) # type: ignore

new_order = [
*[i for i in range(n_levels) if i not in level_indices],
Expand All @@ -1982,7 +1982,7 @@ def _stack_multi(self, level: LevelsType = -1):
return DataFrame(block)

def unstack(self, level: LevelsType = -1):
if isinstance(level, int) or isinstance(level, str):
if not utils.is_list_like(level):
level = [level]

block = self._block
Expand Down
16 changes: 8 additions & 8 deletions bigframes/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -841,7 +841,6 @@ def mode(self) -> Series:
block, agg_ids = block.aggregate(
by_column_ids=[self._value_column],
aggregations=((self._value_column, agg_ops.count_op),),
as_index=False,
)
value_count_col_id = agg_ids[0]
block, max_value_count_col_id = block.apply_window_op(
Expand All @@ -855,14 +854,15 @@ def mode(self) -> Series:
ops.eq_op,
)
block = block.filter(is_mode_col_id)
mode_values_series = Series(
block.select_column(self._value_column).assign_label(
self._value_column, self.name
)
)
return typing.cast(
Series, mode_values_series.sort_values().reset_index(drop=True)
# use temporary name for reset_index to avoid collision, restore after dropping extra columns
block = (
block.with_index_labels(["mode_temp_internal"])
.order_by([OrderingColumnReference(self._value_column)])
.reset_index(drop=False)
)
block = block.select_column(self._value_column).with_column_labels([self.name])
mode_values_series = Series(block.select_column(self._value_column))
return typing.cast(Series, mode_values_series)

def mean(self) -> float:
return typing.cast(float, self._apply_aggregation(agg_ops.mean_op))
Expand Down
17 changes: 13 additions & 4 deletions tests/system/small/test_groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,23 +122,32 @@ def test_dataframe_groupby_agg_list(scalars_df_index, scalars_pandas_df_index):
pd.testing.assert_frame_equal(pd_result, bf_result_computed, check_dtype=False)


@pytest.mark.parametrize(
("as_index"),
[
(True),
(False),
],
)
def test_dataframe_groupby_agg_dict_with_list(
scalars_df_index, scalars_pandas_df_index
scalars_df_index, scalars_pandas_df_index, as_index
):
col_names = ["int64_too", "float64_col", "int64_col", "bool_col", "string_col"]
bf_result = (
scalars_df_index[col_names]
.groupby("string_col")
.groupby("string_col", as_index=as_index)
.agg({"int64_too": ["mean", "max"], "string_col": "count"})
)
pd_result = (
scalars_pandas_df_index[col_names]
.groupby("string_col")
.groupby("string_col", as_index=as_index)
.agg({"int64_too": ["mean", "max"], "string_col": "count"})
)
bf_result_computed = bf_result.to_pandas()

pd.testing.assert_frame_equal(pd_result, bf_result_computed, check_dtype=False)
pd.testing.assert_frame_equal(
pd_result, bf_result_computed, check_dtype=False, check_index_type=False
)


def test_dataframe_groupby_agg_dict_no_lists(scalars_df_index, scalars_pandas_df_index):
Expand Down
11 changes: 9 additions & 2 deletions tests/system/small/test_multiindex.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,17 +356,24 @@ def test_multi_index_dataframe_groupby(scalars_df_index, scalars_pandas_df_index
def test_multi_index_dataframe_groupby_level_aggregate(
scalars_df_index, scalars_pandas_df_index, level, as_index
):
index_cols = ["int64_too", "bool_col"]
bf_result = (
scalars_df_index.set_index(["int64_too", "bool_col"])
scalars_df_index.set_index(index_cols)
.groupby(level=level, as_index=as_index)
.mean(numeric_only=True)
.to_pandas()
)
pd_result = (
scalars_pandas_df_index.set_index(["int64_too", "bool_col"])
scalars_pandas_df_index.set_index(index_cols)
.groupby(level=level, as_index=as_index)
.mean(numeric_only=True)
)
# For as_index=False, pandas will drop index levels used as groupings
# In the future, it will include this in the result, bigframes already does this behavior
if not as_index:
for col in index_cols:
if col in bf_result.columns:
bf_result = bf_result.drop(col, axis=1)

# Pandas will have int64 index, while bigquery will have Int64 when resetting
pandas.testing.assert_frame_equal(bf_result, pd_result, check_index_type=False)
Expand Down