diff --git a/bigframes/core/block_transforms.py b/bigframes/core/block_transforms.py index df84f70859..6654892287 100644 --- a/bigframes/core/block_transforms.py +++ b/bigframes/core/block_transforms.py @@ -332,7 +332,6 @@ def value_counts( by_column_ids=columns, aggregations=[(dummy, agg_ops.count_op)], dropna=dropna, - as_index=True, ) count_id = agg_ids[0] if normalize: diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index 6542b694d2..3163aa5b09 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -66,7 +66,7 @@ _MONOTONIC_DECREASING = "monotonic_decreasing" -LevelType = typing.Union[str, int] +LevelType = typing.Hashable LevelsType = typing.Union[LevelType, typing.Sequence[LevelType]] @@ -941,7 +941,6 @@ def aggregate( by_column_ids: typing.Sequence[str] = (), aggregations: typing.Sequence[typing.Tuple[str, agg_ops.AggregateOp]] = (), *, - as_index: bool = True, dropna: bool = True, ) -> typing.Tuple[Block, typing.Sequence[str]]: """ @@ -962,40 +961,21 @@ def aggregate( aggregate_labels = self._get_labels_for_columns( [agg[0] for agg in aggregations] ) - if as_index: - names: typing.List[Label] = [] - for by_col_id in by_column_ids: - if by_col_id in self.value_columns: - names.append(self.col_id_to_label[by_col_id]) - else: - names.append(self.col_id_to_index_name[by_col_id]) - return ( - Block( - result_expr, - index_columns=by_column_ids, - column_labels=aggregate_labels, - index_labels=names, - ), - output_col_ids, - ) - else: # as_index = False - # If as_index=False, drop grouping levels, but keep grouping value columns - by_value_columns = [ - col for col in by_column_ids if col in self.value_columns - ] - by_column_labels = self._get_labels_for_columns(by_value_columns) - labels = (*by_column_labels, *aggregate_labels) - offsets_id = guid.generate_guid() - result_expr_pruned = result_expr.select_columns( - [*by_value_columns, *output_col_ids] - ).promote_offsets(offsets_id) - - return ( - Block( - result_expr_pruned, index_columns=[offsets_id], column_labels=labels - ), - output_col_ids, - ) + names: typing.List[Label] = [] + for by_col_id in by_column_ids: + if by_col_id in self.value_columns: + names.append(self.col_id_to_label[by_col_id]) + else: + names.append(self.col_id_to_index_name[by_col_id]) + return ( + Block( + result_expr, + index_columns=by_column_ids, + column_labels=aggregate_labels, + index_labels=names, + ), + output_col_ids, + ) def get_stat(self, column_id: str, stat: agg_ops.AggregateOp): """Gets aggregates immediately, and caches it""" @@ -1324,7 +1304,6 @@ def pivot( result_block, _ = block.aggregate( by_column_ids=self.index_columns, aggregations=aggregations, - as_index=True, dropna=True, ) diff --git a/bigframes/core/groupby/__init__.py b/bigframes/core/groupby/__init__.py index a8b8afdae7..3ee46ef675 100644 --- a/bigframes/core/groupby/__init__.py +++ b/bigframes/core/groupby/__init__.py @@ -263,10 +263,10 @@ def _agg_string(self, func: str) -> df.DataFrame: agg_block, _ = self._block.aggregate( by_column_ids=self._by_col_ids, aggregations=aggregations, - as_index=self._as_index, dropna=self._dropna, ) - return df.DataFrame(agg_block) + dataframe = df.DataFrame(agg_block) + return dataframe if self._as_index else self._convert_index(dataframe) def _agg_dict(self, func: typing.Mapping) -> df.DataFrame: aggregations: typing.List[typing.Tuple[str, agg_ops.AggregateOp]] = [] @@ -285,7 +285,6 @@ def _agg_dict(self, func: typing.Mapping) -> df.DataFrame: agg_block, _ = self._block.aggregate( by_column_ids=self._by_col_ids, aggregations=aggregations, - as_index=self._as_index, dropna=self._dropna, ) if want_aggfunc_level: @@ -297,7 +296,8 @@ def _agg_dict(self, func: typing.Mapping) -> df.DataFrame: ) else: agg_block = agg_block.with_column_labels(pd.Index(column_labels)) - return df.DataFrame(agg_block) + dataframe = df.DataFrame(agg_block) + return dataframe if self._as_index else self._convert_index(dataframe) def _agg_list(self, func: typing.Sequence) -> df.DataFrame: aggregations = [ @@ -311,7 +311,6 @@ def _agg_list(self, func: typing.Sequence) -> df.DataFrame: agg_block, _ = self._block.aggregate( by_column_ids=self._by_col_ids, aggregations=aggregations, - as_index=self._as_index, dropna=self._dropna, ) agg_block = agg_block.with_column_labels( @@ -319,7 +318,8 @@ def _agg_list(self, func: typing.Sequence) -> df.DataFrame: column_labels, names=[*self._block.column_labels.names, None] ) ) - return df.DataFrame(agg_block) + dataframe = df.DataFrame(agg_block) + return dataframe if self._as_index else self._convert_index(dataframe) def _agg_named(self, **kwargs) -> df.DataFrame: aggregations = [] @@ -339,11 +339,21 @@ def _agg_named(self, **kwargs) -> df.DataFrame: agg_block, _ = self._block.aggregate( by_column_ids=self._by_col_ids, aggregations=aggregations, - as_index=self._as_index, dropna=self._dropna, ) agg_block = agg_block.with_column_labels(column_labels) - return df.DataFrame(agg_block) + dataframe = df.DataFrame(agg_block) + return dataframe if self._as_index else self._convert_index(dataframe) + + def _convert_index(self, dataframe: df.DataFrame): + """Convert index levels to columns except where names conflict.""" + levels_to_drop = [ + level for level in dataframe.index.names if level in dataframe.columns + ] + + if len(levels_to_drop) == dataframe.index.nlevels: + return dataframe.reset_index(drop=True) + return dataframe.droplevel(levels_to_drop).reset_index(drop=False) aggregate = agg @@ -379,10 +389,10 @@ def _aggregate_all( result_block, _ = self._block.aggregate( by_column_ids=self._by_col_ids, aggregations=aggregations, - as_index=self._as_index, dropna=self._dropna, ) - return df.DataFrame(result_block) + dataframe = df.DataFrame(result_block) + return dataframe if self._as_index else self._convert_index(dataframe) def _apply_window_op( self, diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 1d8169960b..98aa8f1185 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -72,7 +72,7 @@ # TODO(tbergeron): Convert to bytes-based limit MAX_INLINE_DF_SIZE = 5000 -LevelType = typing.Union[str, int] +LevelType = typing.Hashable LevelsType = typing.Union[LevelType, typing.Sequence[LevelType]] SingleItemValue = Union[bigframes.series.Series, int, float, Callable] @@ -1956,7 +1956,7 @@ def _stack_mono(self): def _stack_multi(self, level: LevelsType = -1): n_levels = self.columns.nlevels - if isinstance(level, int) or isinstance(level, str): + if not utils.is_list_like(level): level = [level] level_indices = [] for level_ref in level: @@ -1966,7 +1966,7 @@ def _stack_multi(self, level: LevelsType = -1): else: level_indices.append(level_ref) else: # str - level_indices.append(self.columns.names.index(level_ref)) + level_indices.append(self.columns.names.index(level_ref)) # type: ignore new_order = [ *[i for i in range(n_levels) if i not in level_indices], @@ -1982,7 +1982,7 @@ def _stack_multi(self, level: LevelsType = -1): return DataFrame(block) def unstack(self, level: LevelsType = -1): - if isinstance(level, int) or isinstance(level, str): + if not utils.is_list_like(level): level = [level] block = self._block diff --git a/bigframes/series.py b/bigframes/series.py index 1b9982877a..6837c1c7f8 100644 --- a/bigframes/series.py +++ b/bigframes/series.py @@ -841,7 +841,6 @@ def mode(self) -> Series: block, agg_ids = block.aggregate( by_column_ids=[self._value_column], aggregations=((self._value_column, agg_ops.count_op),), - as_index=False, ) value_count_col_id = agg_ids[0] block, max_value_count_col_id = block.apply_window_op( @@ -855,14 +854,15 @@ def mode(self) -> Series: ops.eq_op, ) block = block.filter(is_mode_col_id) - mode_values_series = Series( - block.select_column(self._value_column).assign_label( - self._value_column, self.name - ) - ) - return typing.cast( - Series, mode_values_series.sort_values().reset_index(drop=True) + # use temporary name for reset_index to avoid collision, restore after dropping extra columns + block = ( + block.with_index_labels(["mode_temp_internal"]) + .order_by([OrderingColumnReference(self._value_column)]) + .reset_index(drop=False) ) + block = block.select_column(self._value_column).with_column_labels([self.name]) + mode_values_series = Series(block.select_column(self._value_column)) + return typing.cast(Series, mode_values_series) def mean(self) -> float: return typing.cast(float, self._apply_aggregation(agg_ops.mean_op)) diff --git a/tests/system/small/test_groupby.py b/tests/system/small/test_groupby.py index 5214905186..2919c167ef 100644 --- a/tests/system/small/test_groupby.py +++ b/tests/system/small/test_groupby.py @@ -122,23 +122,32 @@ def test_dataframe_groupby_agg_list(scalars_df_index, scalars_pandas_df_index): pd.testing.assert_frame_equal(pd_result, bf_result_computed, check_dtype=False) +@pytest.mark.parametrize( + ("as_index"), + [ + (True), + (False), + ], +) def test_dataframe_groupby_agg_dict_with_list( - scalars_df_index, scalars_pandas_df_index + scalars_df_index, scalars_pandas_df_index, as_index ): col_names = ["int64_too", "float64_col", "int64_col", "bool_col", "string_col"] bf_result = ( scalars_df_index[col_names] - .groupby("string_col") + .groupby("string_col", as_index=as_index) .agg({"int64_too": ["mean", "max"], "string_col": "count"}) ) pd_result = ( scalars_pandas_df_index[col_names] - .groupby("string_col") + .groupby("string_col", as_index=as_index) .agg({"int64_too": ["mean", "max"], "string_col": "count"}) ) bf_result_computed = bf_result.to_pandas() - pd.testing.assert_frame_equal(pd_result, bf_result_computed, check_dtype=False) + pd.testing.assert_frame_equal( + pd_result, bf_result_computed, check_dtype=False, check_index_type=False + ) def test_dataframe_groupby_agg_dict_no_lists(scalars_df_index, scalars_pandas_df_index): diff --git a/tests/system/small/test_multiindex.py b/tests/system/small/test_multiindex.py index e7e93849c6..1708735f4c 100644 --- a/tests/system/small/test_multiindex.py +++ b/tests/system/small/test_multiindex.py @@ -356,17 +356,24 @@ def test_multi_index_dataframe_groupby(scalars_df_index, scalars_pandas_df_index def test_multi_index_dataframe_groupby_level_aggregate( scalars_df_index, scalars_pandas_df_index, level, as_index ): + index_cols = ["int64_too", "bool_col"] bf_result = ( - scalars_df_index.set_index(["int64_too", "bool_col"]) + scalars_df_index.set_index(index_cols) .groupby(level=level, as_index=as_index) .mean(numeric_only=True) .to_pandas() ) pd_result = ( - scalars_pandas_df_index.set_index(["int64_too", "bool_col"]) + scalars_pandas_df_index.set_index(index_cols) .groupby(level=level, as_index=as_index) .mean(numeric_only=True) ) + # For as_index=False, pandas will drop index levels used as groupings + # In the future, it will include this in the result, bigframes already does this behavior + if not as_index: + for col in index_cols: + if col in bf_result.columns: + bf_result = bf_result.drop(col, axis=1) # Pandas will have int64 index, while bigquery will have Int64 when resetting pandas.testing.assert_frame_equal(bf_result, pd_result, check_index_type=False)