From bcd93e014b002f211c5b855afa7d7b91cea3f9ea Mon Sep 17 00:00:00 2001 From: Thomas Li <47963215+lithomas1@users.noreply.github.com> Date: Mon, 22 May 2023 11:38:25 -0700 Subject: [PATCH 01/10] ENH: non float64 result support in numba groupby --- asv_bench/benchmarks/groupby.py | 21 +++ pandas/core/_numba/executor.py | 67 ++++++--- pandas/core/_numba/kernels/sum_.py | 213 ++++++++++++++++++----------- pandas/core/groupby/groupby.py | 117 ++++++++++++++-- pandas/io/formats/info.py | 4 +- pandas/tests/groupby/test_numba.py | 12 +- 6 files changed, 312 insertions(+), 122 deletions(-) diff --git a/asv_bench/benchmarks/groupby.py b/asv_bench/benchmarks/groupby.py index 4c0f3ddd826b7..1dd90bbf48dd4 100644 --- a/asv_bench/benchmarks/groupby.py +++ b/asv_bench/benchmarks/groupby.py @@ -4,6 +4,7 @@ import numpy as np +import pandas as pd from pandas import ( NA, Categorical, @@ -515,9 +516,17 @@ def setup(self, dtype, method, application, ncols): def time_dtype_as_group(self, dtype, method, application, ncols): self.as_group_method() + def time_dtype_as_group_numba(self, dtype, method, application, ncols): + with pd.option_context("compute.use_numba", True): + self.as_group_method() + def time_dtype_as_field(self, dtype, method, application, ncols): self.as_field_method() + def time_dtype_as_field_numba(self, dtype, method, application, ncols): + with pd.option_context("compute.use_numba", True): + self.as_field_method() + class GroupByCythonAgg: """ @@ -554,6 +563,18 @@ def time_frame_agg(self, dtype, method): self.df.groupby("key").agg(method) +class GroupByNumbaAgg(GroupByCythonAgg): + """ + Benchmarks specifically targeting our numba aggregation algorithms + (using a big enough dataframe with simple key, so a large part of the + time is actually spent in the grouped aggregation). + """ + + def time_frame_agg(self, dtype, method): + with pd.option_context("compute.use_numba", True): + self.df.groupby("key").agg(method) + + class GroupByCythonAggEaDtypes: """ Benchmarks specifically targeting our cython aggregation algorithms diff --git a/pandas/core/_numba/executor.py b/pandas/core/_numba/executor.py index 891772602c442..9047820c6ec2c 100644 --- a/pandas/core/_numba/executor.py +++ b/pandas/core/_numba/executor.py @@ -15,8 +15,45 @@ @functools.lru_cache(maxsize=None) +def make_looper(func, result_dtype, nopython, nogil, parallel): + if TYPE_CHECKING: + import numba + else: + numba = import_optional_dependency("numba") + + @numba.jit(nopython=nopython, nogil=nogil, parallel=parallel) + def column_looper( + values: np.ndarray, + start: np.ndarray, + end: np.ndarray, + min_periods: int, + *args, + ): + result = np.empty((len(start), values.shape[0]), dtype=result_dtype) + for i in numba.prange(values.shape[0]): + result[:, i] = func(values[i], start, end, min_periods, *args) + return result.T + + return column_looper + + +default_dtype_mapping = { + np.dtype("int8"): np.int64, + np.dtype("int16"): np.int64, + np.dtype("int32"): np.int64, + np.dtype("int64"): np.int64, + np.dtype("uint8"): np.uint64, + np.dtype("uint16"): np.uint64, + np.dtype("uint32"): np.uint64, + np.dtype("uint64"): np.uint64, + np.dtype("float32"): np.float64, + np.dtype("float64"): np.float64, +} + + def generate_shared_aggregator( func: Callable[..., Scalar], + dtype_mapping: dict[np.dtype, np.dtype], nopython: bool, nogil: bool, parallel: bool, @@ -29,6 +66,11 @@ def generate_shared_aggregator( ---------- func : function aggregation function to be applied to each column + dtype_mapping: dict or None + If not None, maps a dtype to a result dtype. + Otherwise, will fall back to default mapping. + all integers -> int64 + all floats -> float64 nopython : bool nopython to be passed into numba.jit nogil : bool @@ -40,22 +82,13 @@ def generate_shared_aggregator( ------- Numba function """ - if TYPE_CHECKING: - import numba - else: - numba = import_optional_dependency("numba") - @numba.jit(nopython=nopython, nogil=nogil, parallel=parallel) - def column_looper( - values: np.ndarray, - start: np.ndarray, - end: np.ndarray, - min_periods: int, - *args, - ): - result = np.empty((len(start), values.shape[1]), dtype=np.float64) - for i in numba.prange(values.shape[1]): - result[:, i] = func(values[:, i], start, end, min_periods, *args) - return result + # A wrapper around the looper function, + # to dispatch based on dtype since numba is unable to do that in nopython mode + def looper_wrapper(values, start, end, min_periods, **kwargs): + result_dtype = dtype_mapping[values.dtype] + column_looper = make_looper(func, result_dtype, nopython, nogil, parallel) + # Need to unpack kwargs since numba only supports *args + return column_looper(values, start, end, min_periods, *kwargs.values()) - return column_looper + return looper_wrapper diff --git a/pandas/core/_numba/kernels/sum_.py b/pandas/core/_numba/kernels/sum_.py index 056897189fe67..731c51e24d6e5 100644 --- a/pandas/core/_numba/kernels/sum_.py +++ b/pandas/core/_numba/kernels/sum_.py @@ -52,87 +52,142 @@ def remove_sum( return nobs, sum_x, compensation -@numba.jit(nopython=True, nogil=True, parallel=False) -def sliding_sum( - values: np.ndarray, - start: np.ndarray, - end: np.ndarray, - min_periods: int, +def sliding_sum(values, start, end, min_periods): + # Need a dummy function to overload + pass + + +@numba.extending.overload(sliding_sum) +def sliding_sum_wrapper( + values: np.ndarray, start: np.ndarray, end: np.ndarray, min_periods: int ) -> np.ndarray: - N = len(start) - nobs = 0 - sum_x = 0.0 - compensation_add = 0.0 - compensation_remove = 0.0 - - is_monotonic_increasing_bounds = is_monotonic_increasing( - start - ) and is_monotonic_increasing(end) - - output = np.empty(N, dtype=np.float64) - - for i in range(N): - s = start[i] - e = end[i] - if i == 0 or not is_monotonic_increasing_bounds: - prev_value = values[s] - num_consecutive_same_value = 0 - - for j in range(s, e): - val = values[j] - ( - nobs, - sum_x, - compensation_add, - num_consecutive_same_value, - prev_value, - ) = add_sum( - val, - nobs, - sum_x, - compensation_add, - num_consecutive_same_value, - prev_value, - ) - else: - for j in range(start[i - 1], s): - val = values[j] - nobs, sum_x, compensation_remove = remove_sum( - val, nobs, sum_x, compensation_remove - ) - - for j in range(end[i - 1], e): - val = values[j] - ( - nobs, - sum_x, - compensation_add, - num_consecutive_same_value, - prev_value, - ) = add_sum( - val, - nobs, - sum_x, - compensation_add, - num_consecutive_same_value, - prev_value, - ) - - if nobs == 0 == min_periods: - result = 0.0 - elif nobs >= min_periods: - if num_consecutive_same_value >= nobs: - result = prev_value * nobs + dtype = values.dtype + + if isinstance(dtype, numba.types.Integer): + na_val = 0 + + @numba.extending.register_jitable(inline="always") + def process_na_func(x, na_pos): + return na_pos.append(x) + + else: + na_val = np.nan + + @numba.extending.register_jitable(inline="always") + def process_na_func(x, na_pos): + pass + + @numba.extending.register_jitable + def sliding_sum( + values: np.ndarray, + start: np.ndarray, + end: np.ndarray, + min_periods: int, + ) -> np.ndarray: + N = len(start) + nobs = 0 + sum_x = 0 + compensation_add = 0 + compensation_remove = 0 + # Stores positions of the na_values + # Trick to force list to be inferred as int list + # https://numba.pydata.org/numba-doc/latest/user/troubleshoot.html#my-code-has-an-untyped-list-problem + na_pos = [0 for i in range(0)] + + is_monotonic_increasing_bounds = is_monotonic_increasing( + start + ) and is_monotonic_increasing(end) + + # output = np.empty(N, dtype=dtype) + output = np.empty(N, dtype=np.float64) + + for i in range(N): + s = start[i] + e = end[i] + if i == 0 or not is_monotonic_increasing_bounds: + prev_value = values[s] + num_consecutive_same_value = 0 + + for j in range(s, e): + val = values[j] + ( + nobs, + sum_x, + compensation_add, + num_consecutive_same_value, + prev_value, + ) = add_sum( + val, + nobs, + sum_x, + compensation_add, + num_consecutive_same_value, + prev_value, + ) else: - result = sum_x - else: - result = np.nan + for j in range(start[i - 1], s): + val = values[j] + nobs, sum_x, compensation_remove = remove_sum( + val, nobs, sum_x, compensation_remove + ) + + for j in range(end[i - 1], e): + val = values[j] + ( + nobs, + sum_x, + compensation_add, + num_consecutive_same_value, + prev_value, + ) = add_sum( + val, + nobs, + sum_x, + compensation_add, + num_consecutive_same_value, + prev_value, + ) + + if nobs == 0 == min_periods: + result = 0 + elif nobs >= min_periods: + if num_consecutive_same_value >= nobs: + result = prev_value * nobs + else: + result = sum_x + else: + result = na_val + na_pos = process_na_func(i, na_pos) + + output[i] = result + + if not is_monotonic_increasing_bounds: + nobs = 0 + sum_x = 0 + compensation_remove = 0 + + return output, na_pos + + if isinstance(dtype, numba.types.Integer): + + def sliding_sum_with_nan( + values: np.ndarray, start: np.ndarray, end: np.ndarray, min_periods: int + ): + output, na_pos = sliding_sum(values, start, end, min_periods) + output = output.astype("float64") + # Fancy indexing w/ list doesn't yet work for numba + # xref https://github.com/numba/numba/issues/8616 + # output[na_pos] = np.nan + for pos in na_pos: + output[pos] = np.nan + return output - output[i] = result + return sliding_sum_with_nan + else: - if not is_monotonic_increasing_bounds: - nobs = 0 - sum_x = 0.0 - compensation_remove = 0.0 + def postprocess_sliding_sum( + values: np.ndarray, start: np.ndarray, end: np.ndarray, min_periods: int + ): + return sliding_sum(values, start, end, min_periods)[0] - return output + return postprocess_sliding_sum diff --git a/pandas/core/groupby/groupby.py b/pandas/core/groupby/groupby.py index bdab641719ded..014e7031f3225 100644 --- a/pandas/core/groupby/groupby.py +++ b/pandas/core/groupby/groupby.py @@ -1347,8 +1347,9 @@ def _numba_prep(self, data: DataFrame): def _numba_agg_general( self, func: Callable, + dtype_mapping: dict[np.dtype, np.dtype], engine_kwargs: dict[str, bool] | None, - *aggregator_args, + **aggregator_kwargs, ): """ Perform groupby with a standard numerical aggregation function (e.g. mean) @@ -1363,19 +1364,35 @@ def _numba_agg_general( data = self._obj_with_exclusions df = data if data.ndim == 2 else data.to_frame() - starts, ends, sorted_index, sorted_data = self._numba_prep(df) + + # starts, ends, sorted_index, sorted_data = self._numba_prep(df) + + sorted_df = df.take(self.grouper._sort_idx, axis=self.axis) + sorted_ids = self.grouper._sorted_ids + _, _, ngroups = self.grouper.group_info + starts, ends = lib.generate_slices(sorted_ids, ngroups) aggregator = executor.generate_shared_aggregator( - func, **get_jit_arguments(engine_kwargs) + func, dtype_mapping, **get_jit_arguments(engine_kwargs) + ) + result = sorted_df._mgr.apply( + aggregator, start=starts, end=ends, min_periods=0, **aggregator_kwargs ) - result = aggregator(sorted_data, starts, ends, 0, *aggregator_args) + result.axes[1] = self.grouper.result_index + result = df._constructor(result) + # result = aggregator(sorted_data, starts, ends, 0, *aggregator_args) - index = self.grouper.result_index + # index = self.grouper.result_index if data.ndim == 1: - result_kwargs = {"name": data.name} - result = result.ravel() + # result_kwargs = {"name": data.name} + # result = result.ravel() + result.name = data.name + result = result.squeeze("columns") else: - result_kwargs = {"columns": data.columns} - return data._constructor(result, index=index, **result_kwargs) + # result_kwargs = {"columns": data.columns} + result.columns = data.columns + # result.index = index + return result + # return data._constructor(result, index=index, **result_kwargs) @final def _transform_with_numba(self, func, *args, engine_kwargs=None, **kwargs): @@ -1958,7 +1975,18 @@ def mean( if maybe_use_numba(engine): from pandas.core._numba.kernels import sliding_mean - return self._numba_agg_general(sliding_mean, engine_kwargs) + dtype_mapping = executor.default_dtype_mapping.copy() + # Need to map all integer types -> float + dtype_mapping[np.dtype("int8")] = np.float64 + dtype_mapping[np.dtype("int16")] = np.float64 + dtype_mapping[np.dtype("int32")] = np.float64 + dtype_mapping[np.dtype("int64")] = np.float64 + dtype_mapping[np.dtype("uint8")] = np.float64 + dtype_mapping[np.dtype("uint16")] = np.float64 + dtype_mapping[np.dtype("uint32")] = np.float64 + dtype_mapping[np.dtype("uint64")] = np.float64 + + return self._numba_agg_general(sliding_mean, dtype_mapping, engine_kwargs) else: result = self._cython_agg_general( "mean", @@ -2049,7 +2077,22 @@ def std( if maybe_use_numba(engine): from pandas.core._numba.kernels import sliding_var - return np.sqrt(self._numba_agg_general(sliding_var, engine_kwargs, ddof)) + dtype_mapping = executor.default_dtype_mapping.copy() + # Need to map all integer types -> float + dtype_mapping[np.dtype("int8")] = np.float64 + dtype_mapping[np.dtype("int16")] = np.float64 + dtype_mapping[np.dtype("int32")] = np.float64 + dtype_mapping[np.dtype("int64")] = np.float64 + dtype_mapping[np.dtype("uint8")] = np.float64 + dtype_mapping[np.dtype("uint16")] = np.float64 + dtype_mapping[np.dtype("uint32")] = np.float64 + dtype_mapping[np.dtype("uint64")] = np.float64 + + return np.sqrt( + self._numba_agg_general( + sliding_var, dtype_mapping, engine_kwargs, ddof=ddof + ) + ) else: return self._cython_agg_general( "std", @@ -2112,7 +2155,20 @@ def var( if maybe_use_numba(engine): from pandas.core._numba.kernels import sliding_var - return self._numba_agg_general(sliding_var, engine_kwargs, ddof) + dtype_mapping = executor.default_dtype_mapping.copy() + # Need to map all integer types -> float + dtype_mapping[np.dtype("int8")] = np.float64 + dtype_mapping[np.dtype("int16")] = np.float64 + dtype_mapping[np.dtype("int32")] = np.float64 + dtype_mapping[np.dtype("int64")] = np.float64 + dtype_mapping[np.dtype("uint8")] = np.float64 + dtype_mapping[np.dtype("uint16")] = np.float64 + dtype_mapping[np.dtype("uint32")] = np.float64 + dtype_mapping[np.dtype("uint64")] = np.float64 + + return self._numba_agg_general( + sliding_var, dtype_mapping, engine_kwargs, ddof=ddof + ) else: return self._cython_agg_general( "var", @@ -2335,8 +2391,11 @@ def sum( if maybe_use_numba(engine): from pandas.core._numba.kernels import sliding_sum + dtype_mapping = executor.default_dtype_mapping.copy() + return self._numba_agg_general( sliding_sum, + dtype_mapping, engine_kwargs, ) else: @@ -2372,7 +2431,22 @@ def min( if maybe_use_numba(engine): from pandas.core._numba.kernels import sliding_min_max - return self._numba_agg_general(sliding_min_max, engine_kwargs, False) + dtype_mapping = { + np.dtype("int8"): np.int8, + np.dtype("int16"): np.int16, + np.dtype("int32"): np.int32, + np.dtype("int64"): np.int64, + np.dtype("uint8"): np.uint8, + np.dtype("uint16"): np.uint16, + np.dtype("uint32"): np.uint32, + np.dtype("uint64"): np.uint64, + np.dtype("float32"): np.float32, + np.dtype("float64"): np.float64, + } + + return self._numba_agg_general( + sliding_min_max, dtype_mapping, engine_kwargs, is_max=False + ) else: return self._agg_general( numeric_only=numeric_only, @@ -2393,7 +2467,22 @@ def max( if maybe_use_numba(engine): from pandas.core._numba.kernels import sliding_min_max - return self._numba_agg_general(sliding_min_max, engine_kwargs, True) + dtype_mapping = { + np.dtype("int8"): np.int8, + np.dtype("int16"): np.int16, + np.dtype("int32"): np.int32, + np.dtype("int64"): np.int64, + np.dtype("uint8"): np.uint8, + np.dtype("uint16"): np.uint16, + np.dtype("uint32"): np.uint32, + np.dtype("uint64"): np.uint64, + np.dtype("float32"): np.float32, + np.dtype("float64"): np.float64, + } + + return self._numba_agg_general( + sliding_min_max, dtype_mapping, engine_kwargs, is_max=True + ) else: return self._agg_general( numeric_only=numeric_only, diff --git a/pandas/io/formats/info.py b/pandas/io/formats/info.py index 55dacd0c268ff..260620e145105 100644 --- a/pandas/io/formats/info.py +++ b/pandas/io/formats/info.py @@ -14,8 +14,6 @@ Sequence, ) -import numpy as np - from pandas._config import get_option from pandas.io.formats import format as fmt @@ -1099,4 +1097,4 @@ def _get_dataframe_dtype_counts(df: DataFrame) -> Mapping[str, int]: Create mapping between datatypes and their number of occurrences. """ # groupby dtype.name to collect e.g. Categorical columns - return df.dtypes.value_counts().groupby(lambda x: x.name).sum().astype(np.intp) + return df.dtypes.value_counts().groupby(lambda x: x.name).sum() diff --git a/pandas/tests/groupby/test_numba.py b/pandas/tests/groupby/test_numba.py index 4eb7b6a7b5bea..3946fc6626ac1 100644 --- a/pandas/tests/groupby/test_numba.py +++ b/pandas/tests/groupby/test_numba.py @@ -24,9 +24,7 @@ def test_cython_vs_numba_frame( engine="numba", engine_kwargs=engine_kwargs, **kwargs ) expected = getattr(gb, func)(**kwargs) - # check_dtype can be removed if GH 44952 is addressed - check_dtype = func not in ("sum", "min", "max") - tm.assert_frame_equal(result, expected, check_dtype=check_dtype) + tm.assert_frame_equal(result, expected) def test_cython_vs_numba_getitem( self, sort, nogil, parallel, nopython, numba_supported_reductions @@ -39,9 +37,7 @@ def test_cython_vs_numba_getitem( engine="numba", engine_kwargs=engine_kwargs, **kwargs ) expected = getattr(gb, func)(**kwargs) - # check_dtype can be removed if GH 44952 is addressed - check_dtype = func not in ("sum", "min", "max") - tm.assert_series_equal(result, expected, check_dtype=check_dtype) + tm.assert_series_equal(result, expected) def test_cython_vs_numba_series( self, sort, nogil, parallel, nopython, numba_supported_reductions @@ -54,9 +50,7 @@ def test_cython_vs_numba_series( engine="numba", engine_kwargs=engine_kwargs, **kwargs ) expected = getattr(gb, func)(**kwargs) - # check_dtype can be removed if GH 44952 is addressed - check_dtype = func not in ("sum", "min", "max") - tm.assert_series_equal(result, expected, check_dtype=check_dtype) + tm.assert_series_equal(result, expected) def test_as_index_false_unsupported(self, numba_supported_reductions): func, kwargs = numba_supported_reductions From e22d783da6f62f90765c09d08b3e38289d5dc103 Mon Sep 17 00:00:00 2001 From: Thomas Li <47963215+lithomas1@users.noreply.github.com> Date: Fri, 26 May 2023 15:23:43 -0700 Subject: [PATCH 02/10] refactor & simplify --- pandas/core/_numba/executor.py | 40 ++++- pandas/core/_numba/kernels/mean_.py | 9 +- pandas/core/_numba/kernels/min_max_.py | 11 +- pandas/core/_numba/kernels/sum_.py | 212 ++++++++++--------------- pandas/core/_numba/kernels/var_.py | 9 +- pandas/core/groupby/groupby.py | 30 ++-- pandas/core/window/rolling.py | 31 +++- pandas/tests/groupby/conftest.py | 17 +- 8 files changed, 197 insertions(+), 162 deletions(-) diff --git a/pandas/core/_numba/executor.py b/pandas/core/_numba/executor.py index 9047820c6ec2c..457f0d716cab0 100644 --- a/pandas/core/_numba/executor.py +++ b/pandas/core/_numba/executor.py @@ -29,10 +29,16 @@ def column_looper( min_periods: int, *args, ): - result = np.empty((len(start), values.shape[0]), dtype=result_dtype) + result = np.empty((values.shape[0], len(start)), dtype=result_dtype) + na_positions = {} for i in numba.prange(values.shape[0]): - result[:, i] = func(values[i], start, end, min_periods, *args) - return result.T + output, na_pos = func( + values[i], result_dtype, start, end, min_periods, *args + ) + result[i] = output + if len(na_pos) > 0: + na_positions[i] = np.array(na_pos) + return result, na_positions return column_looper @@ -48,6 +54,8 @@ def column_looper( np.dtype("uint64"): np.uint64, np.dtype("float32"): np.float64, np.dtype("float64"): np.float64, + np.dtype("complex64"): np.complex64, + np.dtype("complex128"): np.complex128, } @@ -69,8 +77,6 @@ def generate_shared_aggregator( dtype_mapping: dict or None If not None, maps a dtype to a result dtype. Otherwise, will fall back to default mapping. - all integers -> int64 - all floats -> float64 nopython : bool nopython to be passed into numba.jit nogil : bool @@ -85,10 +91,32 @@ def generate_shared_aggregator( # A wrapper around the looper function, # to dispatch based on dtype since numba is unable to do that in nopython mode + + # It also post-processes the values by inserting nans where number of observations + # is less than min_periods + # Cannot do this in numba nopython mode + # (you'll run into type-unification error when you cast int -> float) def looper_wrapper(values, start, end, min_periods, **kwargs): result_dtype = dtype_mapping[values.dtype] column_looper = make_looper(func, result_dtype, nopython, nogil, parallel) # Need to unpack kwargs since numba only supports *args - return column_looper(values, start, end, min_periods, *kwargs.values()) + result, na_positions = column_looper( + values, start, end, min_periods, *kwargs.values() + ) + if result.dtype.kind == "i": + # Look if na_positions is not empty + # If so, convert the whole block + # This is OK since int dtype cannot hold nan, + # so if min_periods not satisfied for 1 col, it is not satisfied for + # all columns at that index + for na_pos in na_positions.values(): + if len(na_pos) > 0: + result = result.astype("float64") + break + # TODO: Optimize this + for i, na_pos in na_positions.items(): + if len(na_pos) > 0: + result[i, na_pos] = np.nan + return result return looper_wrapper diff --git a/pandas/core/_numba/kernels/mean_.py b/pandas/core/_numba/kernels/mean_.py index 725989e093441..aedfa0fae9955 100644 --- a/pandas/core/_numba/kernels/mean_.py +++ b/pandas/core/_numba/kernels/mean_.py @@ -60,6 +60,7 @@ def remove_mean( @numba.jit(nopython=True, nogil=True, parallel=False) def sliding_mean( values: np.ndarray, + result_dtype: np.dtype, start: np.ndarray, end: np.ndarray, min_periods: int, @@ -75,7 +76,7 @@ def sliding_mean( start ) and is_monotonic_increasing(end) - output = np.empty(N, dtype=np.float64) + output = np.empty(N, dtype=result_dtype) for i in range(N): s = start[i] @@ -147,4 +148,8 @@ def sliding_mean( neg_ct = 0 compensation_remove = 0.0 - return output + # na_position is empty list since float64 can already hold nans + # Do list comprehension, since numba cannot figure out that na_pos is + # empty list of ints on its own + na_pos = [0 for i in range(0)] + return output, na_pos diff --git a/pandas/core/_numba/kernels/min_max_.py b/pandas/core/_numba/kernels/min_max_.py index acba66a6e4f63..9cb649785ba03 100644 --- a/pandas/core/_numba/kernels/min_max_.py +++ b/pandas/core/_numba/kernels/min_max_.py @@ -15,6 +15,7 @@ @numba.jit(nopython=True, nogil=True, parallel=False) def sliding_min_max( values: np.ndarray, + result_dtype: np.dtype, start: np.ndarray, end: np.ndarray, min_periods: int, @@ -22,7 +23,8 @@ def sliding_min_max( ) -> np.ndarray: N = len(start) nobs = 0 - output = np.empty(N, dtype=np.float64) + output = np.empty(N, dtype=result_dtype) + na_pos = [] # Use deque once numba supports it # https://github.com/numba/numba/issues/7417 Q: list = [] @@ -64,6 +66,9 @@ def sliding_min_max( if Q and curr_win_size > 0 and nobs >= min_periods: output[i] = values[Q[0]] else: - output[i] = np.nan + if values.dtype.kind != "i": + output[i] = np.nan + else: + na_pos.append(i) - return output + return output, na_pos diff --git a/pandas/core/_numba/kernels/sum_.py b/pandas/core/_numba/kernels/sum_.py index 731c51e24d6e5..eadb8f4ff6a34 100644 --- a/pandas/core/_numba/kernels/sum_.py +++ b/pandas/core/_numba/kernels/sum_.py @@ -52,142 +52,98 @@ def remove_sum( return nobs, sum_x, compensation -def sliding_sum(values, start, end, min_periods): - # Need a dummy function to overload - pass - - -@numba.extending.overload(sliding_sum) -def sliding_sum_wrapper( - values: np.ndarray, start: np.ndarray, end: np.ndarray, min_periods: int +@numba.jit(nopython=True, nogil=True, parallel=False) +def sliding_sum( + values: np.ndarray, + result_dtype: np.dtype, + start: np.ndarray, + end: np.ndarray, + min_periods: int, ) -> np.ndarray: dtype = values.dtype - if isinstance(dtype, numba.types.Integer): + if dtype.kind == "i": na_val = 0 - - @numba.extending.register_jitable(inline="always") - def process_na_func(x, na_pos): - return na_pos.append(x) - else: na_val = np.nan - @numba.extending.register_jitable(inline="always") - def process_na_func(x, na_pos): - pass - - @numba.extending.register_jitable - def sliding_sum( - values: np.ndarray, - start: np.ndarray, - end: np.ndarray, - min_periods: int, - ) -> np.ndarray: - N = len(start) - nobs = 0 - sum_x = 0 - compensation_add = 0 - compensation_remove = 0 - # Stores positions of the na_values - # Trick to force list to be inferred as int list - # https://numba.pydata.org/numba-doc/latest/user/troubleshoot.html#my-code-has-an-untyped-list-problem - na_pos = [0 for i in range(0)] - - is_monotonic_increasing_bounds = is_monotonic_increasing( - start - ) and is_monotonic_increasing(end) - - # output = np.empty(N, dtype=dtype) - output = np.empty(N, dtype=np.float64) - - for i in range(N): - s = start[i] - e = end[i] - if i == 0 or not is_monotonic_increasing_bounds: - prev_value = values[s] - num_consecutive_same_value = 0 - - for j in range(s, e): - val = values[j] - ( - nobs, - sum_x, - compensation_add, - num_consecutive_same_value, - prev_value, - ) = add_sum( - val, - nobs, - sum_x, - compensation_add, - num_consecutive_same_value, - prev_value, - ) - else: - for j in range(start[i - 1], s): - val = values[j] - nobs, sum_x, compensation_remove = remove_sum( - val, nobs, sum_x, compensation_remove - ) - - for j in range(end[i - 1], e): - val = values[j] - ( - nobs, - sum_x, - compensation_add, - num_consecutive_same_value, - prev_value, - ) = add_sum( - val, - nobs, - sum_x, - compensation_add, - num_consecutive_same_value, - prev_value, - ) - - if nobs == 0 == min_periods: - result = 0 - elif nobs >= min_periods: - if num_consecutive_same_value >= nobs: - result = prev_value * nobs - else: - result = sum_x + N = len(start) + nobs = 0 + sum_x = 0 + compensation_add = 0 + compensation_remove = 0 + na_pos = [] + + is_monotonic_increasing_bounds = is_monotonic_increasing( + start + ) and is_monotonic_increasing(end) + + output = np.empty(N, dtype=result_dtype) + + for i in range(N): + s = start[i] + e = end[i] + if i == 0 or not is_monotonic_increasing_bounds: + prev_value = values[s] + num_consecutive_same_value = 0 + + for j in range(s, e): + val = values[j] + ( + nobs, + sum_x, + compensation_add, + num_consecutive_same_value, + prev_value, + ) = add_sum( + val, + nobs, + sum_x, + compensation_add, + num_consecutive_same_value, + prev_value, + ) + else: + for j in range(start[i - 1], s): + val = values[j] + nobs, sum_x, compensation_remove = remove_sum( + val, nobs, sum_x, compensation_remove + ) + + for j in range(end[i - 1], e): + val = values[j] + ( + nobs, + sum_x, + compensation_add, + num_consecutive_same_value, + prev_value, + ) = add_sum( + val, + nobs, + sum_x, + compensation_add, + num_consecutive_same_value, + prev_value, + ) + + if nobs == 0 == min_periods: + result = 0 + elif nobs >= min_periods: + if num_consecutive_same_value >= nobs: + result = prev_value * nobs else: - result = na_val - na_pos = process_na_func(i, na_pos) - - output[i] = result - - if not is_monotonic_increasing_bounds: - nobs = 0 - sum_x = 0 - compensation_remove = 0 - - return output, na_pos - - if isinstance(dtype, numba.types.Integer): - - def sliding_sum_with_nan( - values: np.ndarray, start: np.ndarray, end: np.ndarray, min_periods: int - ): - output, na_pos = sliding_sum(values, start, end, min_periods) - output = output.astype("float64") - # Fancy indexing w/ list doesn't yet work for numba - # xref https://github.com/numba/numba/issues/8616 - # output[na_pos] = np.nan - for pos in na_pos: - output[pos] = np.nan - return output + result = sum_x + else: + result = na_val + if dtype.kind == "i": + na_pos.append(i) - return sliding_sum_with_nan - else: + output[i] = result - def postprocess_sliding_sum( - values: np.ndarray, start: np.ndarray, end: np.ndarray, min_periods: int - ): - return sliding_sum(values, start, end, min_periods)[0] + if not is_monotonic_increasing_bounds: + nobs = 0 + sum_x = 0 + compensation_remove = 0 - return postprocess_sliding_sum + return output, na_pos diff --git a/pandas/core/_numba/kernels/var_.py b/pandas/core/_numba/kernels/var_.py index d3243f4928dca..d94c1cbf53282 100644 --- a/pandas/core/_numba/kernels/var_.py +++ b/pandas/core/_numba/kernels/var_.py @@ -68,6 +68,7 @@ def remove_var( @numba.jit(nopython=True, nogil=True, parallel=False) def sliding_var( values: np.ndarray, + result_dtype: np.dtype, start: np.ndarray, end: np.ndarray, min_periods: int, @@ -85,7 +86,7 @@ def sliding_var( start ) and is_monotonic_increasing(end) - output = np.empty(N, dtype=np.float64) + output = np.empty(N, dtype=result_dtype) for i in range(N): s = start[i] @@ -154,4 +155,8 @@ def sliding_var( ssqdm_x = 0.0 compensation_remove = 0.0 - return output + # na_position is empty list since float64 can already hold nans + # Do list comprehension, since numba cannot figure out that na_pos is + # empty list of ints on its own + na_pos = [0 for i in range(0)] + return output, na_pos diff --git a/pandas/core/groupby/groupby.py b/pandas/core/groupby/groupby.py index 014e7031f3225..92c0c12446522 100644 --- a/pandas/core/groupby/groupby.py +++ b/pandas/core/groupby/groupby.py @@ -1375,24 +1375,17 @@ def _numba_agg_general( func, dtype_mapping, **get_jit_arguments(engine_kwargs) ) result = sorted_df._mgr.apply( - aggregator, start=starts, end=ends, min_periods=0, **aggregator_kwargs + aggregator, start=starts, end=ends, **aggregator_kwargs ) result.axes[1] = self.grouper.result_index result = df._constructor(result) - # result = aggregator(sorted_data, starts, ends, 0, *aggregator_args) - # index = self.grouper.result_index if data.ndim == 1: - # result_kwargs = {"name": data.name} - # result = result.ravel() result.name = data.name result = result.squeeze("columns") else: - # result_kwargs = {"columns": data.columns} result.columns = data.columns - # result.index = index return result - # return data._constructor(result, index=index, **result_kwargs) @final def _transform_with_numba(self, func, *args, engine_kwargs=None, **kwargs): @@ -1986,7 +1979,9 @@ def mean( dtype_mapping[np.dtype("uint32")] = np.float64 dtype_mapping[np.dtype("uint64")] = np.float64 - return self._numba_agg_general(sliding_mean, dtype_mapping, engine_kwargs) + return self._numba_agg_general( + sliding_mean, dtype_mapping, engine_kwargs, min_periods=0 + ) else: result = self._cython_agg_general( "mean", @@ -2090,7 +2085,7 @@ def std( return np.sqrt( self._numba_agg_general( - sliding_var, dtype_mapping, engine_kwargs, ddof=ddof + sliding_var, dtype_mapping, engine_kwargs, min_periods=0, ddof=ddof ) ) else: @@ -2167,7 +2162,7 @@ def var( dtype_mapping[np.dtype("uint64")] = np.float64 return self._numba_agg_general( - sliding_var, dtype_mapping, engine_kwargs, ddof=ddof + sliding_var, dtype_mapping, engine_kwargs, min_periods=0, ddof=ddof ) else: return self._cython_agg_general( @@ -2397,6 +2392,7 @@ def sum( sliding_sum, dtype_mapping, engine_kwargs, + min_periods=min_count, ) else: # If we are grouping on categoricals we want unobserved categories to @@ -2445,7 +2441,11 @@ def min( } return self._numba_agg_general( - sliding_min_max, dtype_mapping, engine_kwargs, is_max=False + sliding_min_max, + dtype_mapping, + engine_kwargs, + min_periods=min_count, + is_max=False, ) else: return self._agg_general( @@ -2481,7 +2481,11 @@ def max( } return self._numba_agg_general( - sliding_min_max, dtype_mapping, engine_kwargs, is_max=True + sliding_min_max, + dtype_mapping, + engine_kwargs, + min_periods=min_count, + is_max=True, ) else: return self._agg_general( diff --git a/pandas/core/window/rolling.py b/pandas/core/window/rolling.py index 86b4e399fa461..a89895cd66094 100644 --- a/pandas/core/window/rolling.py +++ b/pandas/core/window/rolling.py @@ -624,7 +624,7 @@ def _numba_apply( self, func: Callable[..., Any], engine_kwargs: dict[str, bool] | None = None, - *func_args, + **func_kwargs, ): window_indexer = self._get_window_indexer() min_periods = ( @@ -646,10 +646,27 @@ def _numba_apply( step=self.step, ) self._check_window_bounds(start, end, len(values)) + # For now, map everything to float to match the Cython impl + # even though it is wrong + # TODO: Could preserve correct dtypes in future + dtype_mapping = { + np.dtype("int8"): np.float64, + np.dtype("int16"): np.float64, + np.dtype("int32"): np.float64, + np.dtype("int64"): np.float64, + np.dtype("uint8"): np.float64, + np.dtype("uint16"): np.float64, + np.dtype("uint32"): np.float64, + np.dtype("uint64"): np.float64, + np.dtype("float32"): np.float64, + np.dtype("float64"): np.float64, + np.dtype("complex64"): np.float64, + np.dtype("complex128"): np.float64, + } aggregator = executor.generate_shared_aggregator( - func, **get_jit_arguments(engine_kwargs) + func, dtype_mapping, **get_jit_arguments(engine_kwargs) ) - result = aggregator(values, start, end, min_periods, *func_args) + result = aggregator(values.T, start, end, min_periods, **func_kwargs).T result = result.T if self.axis == 1 else result index = self._slice_axis_for_step(obj.index, result) if obj.ndim == 1: @@ -1466,7 +1483,7 @@ def max( else: from pandas.core._numba.kernels import sliding_min_max - return self._numba_apply(sliding_min_max, engine_kwargs, True) + return self._numba_apply(sliding_min_max, engine_kwargs, is_max=True) window_func = window_aggregations.roll_max return self._apply(window_func, name="max", numeric_only=numeric_only) @@ -1488,7 +1505,7 @@ def min( else: from pandas.core._numba.kernels import sliding_min_max - return self._numba_apply(sliding_min_max, engine_kwargs, False) + return self._numba_apply(sliding_min_max, engine_kwargs, is_max=False) window_func = window_aggregations.roll_min return self._apply(window_func, name="min", numeric_only=numeric_only) @@ -1547,7 +1564,7 @@ def std( raise NotImplementedError("std not supported with method='table'") from pandas.core._numba.kernels import sliding_var - return zsqrt(self._numba_apply(sliding_var, engine_kwargs, ddof)) + return zsqrt(self._numba_apply(sliding_var, engine_kwargs, ddof=ddof)) window_func = window_aggregations.roll_var def zsqrt_func(values, begin, end, min_periods): @@ -1571,7 +1588,7 @@ def var( raise NotImplementedError("var not supported with method='table'") from pandas.core._numba.kernels import sliding_var - return self._numba_apply(sliding_var, engine_kwargs, ddof) + return self._numba_apply(sliding_var, engine_kwargs, ddof=ddof) window_func = partial(window_aggregations.roll_var, ddof=ddof) return self._apply( window_func, diff --git a/pandas/tests/groupby/conftest.py b/pandas/tests/groupby/conftest.py index 7e7b97d9273dc..c5e30513f69de 100644 --- a/pandas/tests/groupby/conftest.py +++ b/pandas/tests/groupby/conftest.py @@ -196,8 +196,23 @@ def nopython(request): ("sum", {}), ("min", {}), ("max", {}), + ("sum", {"min_count": 2}), + ("min", {"min_count": 2}), + ("max", {"min_count": 2}), + ], + ids=[ + "mean", + "var_1", + "var_0", + "std_1", + "std_0", + "sum", + "min", + "max", + "sum-min_count", + "min-min_count", + "max-min_count", ], - ids=["mean", "var_1", "var_0", "std_1", "std_0", "sum", "min", "max"], ) def numba_supported_reductions(request): """reductions supported with engine='numba'""" From 9f2f70dc4b023c2e2e6133eb0af57642c254656c Mon Sep 17 00:00:00 2001 From: Thomas Li <47963215+lithomas1@users.noreply.github.com> Date: Mon, 5 Jun 2023 13:58:54 -0700 Subject: [PATCH 03/10] fix CI --- asv_bench/benchmarks/groupby.py | 75 +++++++++++++++++++++----- pandas/core/_numba/kernels/mean_.py | 2 +- pandas/core/_numba/kernels/min_max_.py | 2 +- pandas/core/_numba/kernels/sum_.py | 2 +- pandas/core/_numba/kernels/var_.py | 2 +- 5 files changed, 66 insertions(+), 17 deletions(-) diff --git a/asv_bench/benchmarks/groupby.py b/asv_bench/benchmarks/groupby.py index 1dd90bbf48dd4..bb501deae9323 100644 --- a/asv_bench/benchmarks/groupby.py +++ b/asv_bench/benchmarks/groupby.py @@ -58,6 +58,38 @@ }, } +# These aggregations don't have a kernel implemented for them yet +_numba_unsupported_methods = [ + "all", + "any", + "bfill", + "count", + "cumcount", + "cummax", + "cummin", + "cumprod", + "cumsum", + "describe", + "diff", + "ffill", + "first", + "head", + "last", + "median", + "nunique", + "pct_change", + "prod", + "quantile", + "rank", + "sem", + "shift", + "size", + "skew", + "tail", + "unique", + "value_counts", +] + class ApplyDictReturn: def setup(self): @@ -454,9 +486,10 @@ class GroupByMethods: ], ["direct", "transformation"], [1, 5], + ["cython", "numba"], ] - def setup(self, dtype, method, application, ncols): + def setup(self, dtype, method, application, ncols, engine): if method in method_blocklist.get(dtype, {}): raise NotImplementedError # skip benchmark @@ -475,6 +508,9 @@ def setup(self, dtype, method, application, ncols): # DataFrameGroupBy doesn't have these methods raise NotImplementedError + if engine == "numba" and method in _numba_unsupported_methods or ncols > 1: + raise NotImplementedError + if method == "describe": ngroups = 20 elif method == "skew": @@ -506,26 +542,39 @@ def setup(self, dtype, method, application, ncols): if len(cols) == 1: cols = cols[0] + # Not everything supports the engine keyword yet + kwargs = {} + if engine == "numba": + kwargs["engine"] = engine + if application == "transformation": - self.as_group_method = lambda: df.groupby("key")[cols].transform(method) - self.as_field_method = lambda: df.groupby(cols)["key"].transform(method) + self.as_group_method = lambda: df.groupby("key")[cols].transform( + method, **kwargs + ) + self.as_field_method = lambda: df.groupby(cols)["key"].transform( + method, **kwargs + ) else: - self.as_group_method = getattr(df.groupby("key")[cols], method) - self.as_field_method = getattr(df.groupby(cols)["key"], method) + self.as_group_method = partial( + getattr(df.groupby("key")[cols], method), **kwargs + ) + self.as_field_method = partial( + getattr(df.groupby(cols)["key"], method), **kwargs + ) - def time_dtype_as_group(self, dtype, method, application, ncols): + def time_dtype_as_group(self, dtype, method, application, ncols, engine): self.as_group_method() - def time_dtype_as_group_numba(self, dtype, method, application, ncols): - with pd.option_context("compute.use_numba", True): - self.as_group_method() + # def time_dtype_as_group_numba(self, dtype, method, application, ncols): + # with pd.option_context("compute.use_numba", True): + # self.as_group_method() - def time_dtype_as_field(self, dtype, method, application, ncols): + def time_dtype_as_field(self, dtype, method, application, ncols, engine): self.as_field_method() - def time_dtype_as_field_numba(self, dtype, method, application, ncols): - with pd.option_context("compute.use_numba", True): - self.as_field_method() + # def time_dtype_as_field_numba(self, dtype, method, application, ncols): + # with pd.option_context("compute.use_numba", True): + # self.as_field_method() class GroupByCythonAgg: diff --git a/pandas/core/_numba/kernels/mean_.py b/pandas/core/_numba/kernels/mean_.py index aedfa0fae9955..8774ff72af852 100644 --- a/pandas/core/_numba/kernels/mean_.py +++ b/pandas/core/_numba/kernels/mean_.py @@ -64,7 +64,7 @@ def sliding_mean( start: np.ndarray, end: np.ndarray, min_periods: int, -) -> np.ndarray: +) -> tuple[np.ndarray, list[int]]: N = len(start) nobs = 0 sum_x = 0.0 diff --git a/pandas/core/_numba/kernels/min_max_.py b/pandas/core/_numba/kernels/min_max_.py index 9cb649785ba03..814deeee9d0d5 100644 --- a/pandas/core/_numba/kernels/min_max_.py +++ b/pandas/core/_numba/kernels/min_max_.py @@ -20,7 +20,7 @@ def sliding_min_max( end: np.ndarray, min_periods: int, is_max: bool, -) -> np.ndarray: +) -> tuple[np.ndarray, list[int]]: N = len(start) nobs = 0 output = np.empty(N, dtype=result_dtype) diff --git a/pandas/core/_numba/kernels/sum_.py b/pandas/core/_numba/kernels/sum_.py index eadb8f4ff6a34..081acda2b0f2c 100644 --- a/pandas/core/_numba/kernels/sum_.py +++ b/pandas/core/_numba/kernels/sum_.py @@ -59,7 +59,7 @@ def sliding_sum( start: np.ndarray, end: np.ndarray, min_periods: int, -) -> np.ndarray: +) -> tuple[np.ndarray, list[int]]: dtype = values.dtype if dtype.kind == "i": diff --git a/pandas/core/_numba/kernels/var_.py b/pandas/core/_numba/kernels/var_.py index d94c1cbf53282..e0f46ba6e3805 100644 --- a/pandas/core/_numba/kernels/var_.py +++ b/pandas/core/_numba/kernels/var_.py @@ -73,7 +73,7 @@ def sliding_var( end: np.ndarray, min_periods: int, ddof: int = 1, -) -> np.ndarray: +) -> tuple[np.ndarray, list[int]]: N = len(start) nobs = 0 mean_x = 0.0 From 00ce652e5693f4a161520b20fba4143d6b3b6db6 Mon Sep 17 00:00:00 2001 From: Thomas Li <47963215+lithomas1@users.noreply.github.com> Date: Mon, 5 Jun 2023 19:26:45 -0700 Subject: [PATCH 04/10] maybe green? --- asv_bench/benchmarks/groupby.py | 23 +++++++++++++---------- pandas/core/_numba/executor.py | 3 ++- pandas/core/_numba/kernels/sum_.py | 21 +++++++++++---------- pandas/core/groupby/groupby.py | 10 ++++++---- pandas/core/window/rolling.py | 2 +- 5 files changed, 33 insertions(+), 26 deletions(-) diff --git a/asv_bench/benchmarks/groupby.py b/asv_bench/benchmarks/groupby.py index bb501deae9323..70b89bc174f16 100644 --- a/asv_bench/benchmarks/groupby.py +++ b/asv_bench/benchmarks/groupby.py @@ -486,7 +486,8 @@ class GroupByMethods: ], ["direct", "transformation"], [1, 5], - ["cython", "numba"], + # ["cython", "numba"], + ["numba"], ] def setup(self, dtype, method, application, ncols, engine): @@ -508,7 +509,17 @@ def setup(self, dtype, method, application, ncols, engine): # DataFrameGroupBy doesn't have these methods raise NotImplementedError - if engine == "numba" and method in _numba_unsupported_methods or ncols > 1: + # Numba currently doesn't support + # multiple transform functions or strs for transform, + # grouping on multiple columns + # and we lack kernels for a bunch of methods + if ( + engine == "numba" + and method in _numba_unsupported_methods + or ncols > 1 + or application == "transformation" + or dtype == "datetime" + ): raise NotImplementedError if method == "describe": @@ -565,17 +576,9 @@ def setup(self, dtype, method, application, ncols, engine): def time_dtype_as_group(self, dtype, method, application, ncols, engine): self.as_group_method() - # def time_dtype_as_group_numba(self, dtype, method, application, ncols): - # with pd.option_context("compute.use_numba", True): - # self.as_group_method() - def time_dtype_as_field(self, dtype, method, application, ncols, engine): self.as_field_method() - # def time_dtype_as_field_numba(self, dtype, method, application, ncols): - # with pd.option_context("compute.use_numba", True): - # self.as_field_method() - class GroupByCythonAgg: """ diff --git a/pandas/core/_numba/executor.py b/pandas/core/_numba/executor.py index dc680a4c39ffb..15a5c295da617 100644 --- a/pandas/core/_numba/executor.py +++ b/pandas/core/_numba/executor.py @@ -8,6 +8,7 @@ if TYPE_CHECKING: from pandas._typing import Scalar + from typing import Any import numpy as np @@ -43,7 +44,7 @@ def column_looper( return column_looper -default_dtype_mapping = { +default_dtype_mapping: dict[np.dtype, Any] = { np.dtype("int8"): np.int64, np.dtype("int16"): np.int64, np.dtype("int32"): np.int64, diff --git a/pandas/core/_numba/kernels/sum_.py b/pandas/core/_numba/kernels/sum_.py index 081acda2b0f2c..e834f1410f51a 100644 --- a/pandas/core/_numba/kernels/sum_.py +++ b/pandas/core/_numba/kernels/sum_.py @@ -8,6 +8,8 @@ """ from __future__ import annotations +from typing import Any + import numba import numpy as np @@ -16,13 +18,13 @@ @numba.jit(nopython=True, nogil=True, parallel=False) def add_sum( - val: float, + val: Any, nobs: int, - sum_x: float, - compensation: float, + sum_x: Any, + compensation: Any, num_consecutive_same_value: int, - prev_value: float, -) -> tuple[int, float, float, int, float]: + prev_value: Any, +) -> tuple[int, Any, Any, int, Any]: if not np.isnan(val): nobs += 1 y = val - compensation @@ -41,8 +43,8 @@ def add_sum( @numba.jit(nopython=True, nogil=True, parallel=False) def remove_sum( - val: float, nobs: int, sum_x: float, compensation: float -) -> tuple[int, float, float]: + val: Any, nobs: int, sum_x: Any, compensation: Any +) -> tuple[int, Any, Any]: if not np.isnan(val): nobs -= 1 y = -val - compensation @@ -62,10 +64,9 @@ def sliding_sum( ) -> tuple[np.ndarray, list[int]]: dtype = values.dtype + na_val: object = np.nan if dtype.kind == "i": na_val = 0 - else: - na_val = np.nan N = len(start) nobs = 0 @@ -128,7 +129,7 @@ def sliding_sum( ) if nobs == 0 == min_periods: - result = 0 + result: object = 0 elif nobs >= min_periods: if num_consecutive_same_value >= nobs: result = prev_value * nobs diff --git a/pandas/core/groupby/groupby.py b/pandas/core/groupby/groupby.py index f6d82650ea6d3..df5cb00b63a71 100644 --- a/pandas/core/groupby/groupby.py +++ b/pandas/core/groupby/groupby.py @@ -135,6 +135,8 @@ class providing the base-class of operations. ) if TYPE_CHECKING: + from typing import Any + from pandas.core.window import ( ExpandingGroupby, ExponentialMovingWindowGroupby, @@ -1347,7 +1349,7 @@ def _numba_prep(self, data: DataFrame): def _numba_agg_general( self, func: Callable, - dtype_mapping: dict[np.dtype, np.dtype], + dtype_mapping: dict[np.dtype, Any], engine_kwargs: dict[str, bool] | None, **aggregator_kwargs, ): @@ -1381,8 +1383,8 @@ def _numba_agg_general( result = df._constructor(result) if data.ndim == 1: - result.name = data.name result = result.squeeze("columns") + result.name = data.name else: result.columns = data.columns return result @@ -2427,7 +2429,7 @@ def min( if maybe_use_numba(engine): from pandas.core._numba.kernels import sliding_min_max - dtype_mapping = { + dtype_mapping: dict[np.dtype, Any] = { np.dtype("int8"): np.int8, np.dtype("int16"): np.int16, np.dtype("int32"): np.int32, @@ -2467,7 +2469,7 @@ def max( if maybe_use_numba(engine): from pandas.core._numba.kernels import sliding_min_max - dtype_mapping = { + dtype_mapping: dict[np.dtype, Any] = { np.dtype("int8"): np.int8, np.dtype("int16"): np.int16, np.dtype("int32"): np.int32, diff --git a/pandas/core/window/rolling.py b/pandas/core/window/rolling.py index a89895cd66094..497f029bc14e4 100644 --- a/pandas/core/window/rolling.py +++ b/pandas/core/window/rolling.py @@ -649,7 +649,7 @@ def _numba_apply( # For now, map everything to float to match the Cython impl # even though it is wrong # TODO: Could preserve correct dtypes in future - dtype_mapping = { + dtype_mapping: dict[np.dtype, Any] = { np.dtype("int8"): np.float64, np.dtype("int16"): np.float64, np.dtype("int32"): np.float64, From 64ecaec5d361281c707320c6ffe4e24c8cc70130 Mon Sep 17 00:00:00 2001 From: Thomas Li <47963215+lithomas1@users.noreply.github.com> Date: Tue, 6 Jun 2023 14:14:13 -0700 Subject: [PATCH 05/10] skip unsupported ops in other bench as well --- asv_bench/benchmarks/groupby.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/asv_bench/benchmarks/groupby.py b/asv_bench/benchmarks/groupby.py index 70b89bc174f16..91ae2dd04b244 100644 --- a/asv_bench/benchmarks/groupby.py +++ b/asv_bench/benchmarks/groupby.py @@ -622,6 +622,11 @@ class GroupByNumbaAgg(GroupByCythonAgg): time is actually spent in the grouped aggregation). """ + def setup(self, dtype, method): + if method in _numba_unsupported_methods: + raise NotImplementedError + super().setup(dtype, method) + def time_frame_agg(self, dtype, method): with pd.option_context("compute.use_numba", True): self.df.groupby("key").agg(method) From 4d58a47bf7ab1d42cbb2c6a529d54f418f997cdc Mon Sep 17 00:00:00 2001 From: Thomas Li <47963215+lithomas1@users.noreply.github.com> Date: Fri, 9 Jun 2023 11:51:43 -0700 Subject: [PATCH 06/10] updates from code review --- asv_bench/benchmarks/groupby.py | 3 +- pandas/core/_numba/executor.py | 35 ++++++++- pandas/core/groupby/groupby.py | 81 ++++---------------- pandas/core/window/rolling.py | 16 +--- pandas/tests/groupby/aggregate/test_numba.py | 4 +- 5 files changed, 53 insertions(+), 86 deletions(-) diff --git a/asv_bench/benchmarks/groupby.py b/asv_bench/benchmarks/groupby.py index 91ae2dd04b244..7098a244e3018 100644 --- a/asv_bench/benchmarks/groupby.py +++ b/asv_bench/benchmarks/groupby.py @@ -486,8 +486,7 @@ class GroupByMethods: ], ["direct", "transformation"], [1, 5], - # ["cython", "numba"], - ["numba"], + ["cython", "numba"], ] def setup(self, dtype, method, application, ncols, engine): diff --git a/pandas/core/_numba/executor.py b/pandas/core/_numba/executor.py index 15a5c295da617..24599148356fa 100644 --- a/pandas/core/_numba/executor.py +++ b/pandas/core/_numba/executor.py @@ -3,12 +3,12 @@ import functools from typing import ( TYPE_CHECKING, + Any, Callable, ) if TYPE_CHECKING: from pandas._typing import Scalar - from typing import Any import numpy as np @@ -55,6 +55,39 @@ def column_looper( np.dtype("uint64"): np.uint64, np.dtype("float32"): np.float64, np.dtype("float64"): np.float64, + np.dtype("complex64"): np.complex128, + np.dtype("complex128"): np.complex128, +} + + +# TODO: Preserve complex dtypes + +float_dtype_mapping: dict[np.dtype, Any] = { + np.dtype("int8"): np.float64, + np.dtype("int16"): np.float64, + np.dtype("int32"): np.float64, + np.dtype("int64"): np.float64, + np.dtype("uint8"): np.float64, + np.dtype("uint16"): np.float64, + np.dtype("uint32"): np.float64, + np.dtype("uint64"): np.float64, + np.dtype("float32"): np.float64, + np.dtype("float64"): np.float64, + np.dtype("complex64"): np.float64, + np.dtype("complex128"): np.float64, +} + +identity_dtype_mapping: dict[np.dtype, Any] = { + np.dtype("int8"): np.int8, + np.dtype("int16"): np.int16, + np.dtype("int32"): np.int32, + np.dtype("int64"): np.int64, + np.dtype("uint8"): np.uint8, + np.dtype("uint16"): np.uint16, + np.dtype("uint32"): np.uint32, + np.dtype("uint64"): np.uint64, + np.dtype("float32"): np.float32, + np.dtype("float64"): np.float64, np.dtype("complex64"): np.complex64, np.dtype("complex128"): np.complex128, } diff --git a/pandas/core/groupby/groupby.py b/pandas/core/groupby/groupby.py index df5cb00b63a71..a4cd1fddd7b01 100644 --- a/pandas/core/groupby/groupby.py +++ b/pandas/core/groupby/groupby.py @@ -1970,19 +1970,8 @@ def mean( if maybe_use_numba(engine): from pandas.core._numba.kernels import sliding_mean - dtype_mapping = executor.default_dtype_mapping.copy() - # Need to map all integer types -> float - dtype_mapping[np.dtype("int8")] = np.float64 - dtype_mapping[np.dtype("int16")] = np.float64 - dtype_mapping[np.dtype("int32")] = np.float64 - dtype_mapping[np.dtype("int64")] = np.float64 - dtype_mapping[np.dtype("uint8")] = np.float64 - dtype_mapping[np.dtype("uint16")] = np.float64 - dtype_mapping[np.dtype("uint32")] = np.float64 - dtype_mapping[np.dtype("uint64")] = np.float64 - return self._numba_agg_general( - sliding_mean, dtype_mapping, engine_kwargs, min_periods=0 + sliding_mean, executor.float_dtype_mapping, engine_kwargs, min_periods=0 ) else: result = self._cython_agg_general( @@ -2074,20 +2063,13 @@ def std( if maybe_use_numba(engine): from pandas.core._numba.kernels import sliding_var - dtype_mapping = executor.default_dtype_mapping.copy() - # Need to map all integer types -> float - dtype_mapping[np.dtype("int8")] = np.float64 - dtype_mapping[np.dtype("int16")] = np.float64 - dtype_mapping[np.dtype("int32")] = np.float64 - dtype_mapping[np.dtype("int64")] = np.float64 - dtype_mapping[np.dtype("uint8")] = np.float64 - dtype_mapping[np.dtype("uint16")] = np.float64 - dtype_mapping[np.dtype("uint32")] = np.float64 - dtype_mapping[np.dtype("uint64")] = np.float64 - return np.sqrt( self._numba_agg_general( - sliding_var, dtype_mapping, engine_kwargs, min_periods=0, ddof=ddof + sliding_var, + executor.float_dtype_mapping, + engine_kwargs, + min_periods=0, + ddof=ddof, ) ) else: @@ -2152,19 +2134,12 @@ def var( if maybe_use_numba(engine): from pandas.core._numba.kernels import sliding_var - dtype_mapping = executor.default_dtype_mapping.copy() - # Need to map all integer types -> float - dtype_mapping[np.dtype("int8")] = np.float64 - dtype_mapping[np.dtype("int16")] = np.float64 - dtype_mapping[np.dtype("int32")] = np.float64 - dtype_mapping[np.dtype("int64")] = np.float64 - dtype_mapping[np.dtype("uint8")] = np.float64 - dtype_mapping[np.dtype("uint16")] = np.float64 - dtype_mapping[np.dtype("uint32")] = np.float64 - dtype_mapping[np.dtype("uint64")] = np.float64 - return self._numba_agg_general( - sliding_var, dtype_mapping, engine_kwargs, min_periods=0, ddof=ddof + sliding_var, + executor.float_dtype_mapping, + engine_kwargs, + min_periods=0, + ddof=ddof, ) else: return self._cython_agg_general( @@ -2388,11 +2363,9 @@ def sum( if maybe_use_numba(engine): from pandas.core._numba.kernels import sliding_sum - dtype_mapping = executor.default_dtype_mapping.copy() - return self._numba_agg_general( sliding_sum, - dtype_mapping, + executor.default_dtype_mapping, engine_kwargs, min_periods=min_count, ) @@ -2429,22 +2402,9 @@ def min( if maybe_use_numba(engine): from pandas.core._numba.kernels import sliding_min_max - dtype_mapping: dict[np.dtype, Any] = { - np.dtype("int8"): np.int8, - np.dtype("int16"): np.int16, - np.dtype("int32"): np.int32, - np.dtype("int64"): np.int64, - np.dtype("uint8"): np.uint8, - np.dtype("uint16"): np.uint16, - np.dtype("uint32"): np.uint32, - np.dtype("uint64"): np.uint64, - np.dtype("float32"): np.float32, - np.dtype("float64"): np.float64, - } - return self._numba_agg_general( sliding_min_max, - dtype_mapping, + executor.identity_dtype_mapping, engine_kwargs, min_periods=min_count, is_max=False, @@ -2469,22 +2429,9 @@ def max( if maybe_use_numba(engine): from pandas.core._numba.kernels import sliding_min_max - dtype_mapping: dict[np.dtype, Any] = { - np.dtype("int8"): np.int8, - np.dtype("int16"): np.int16, - np.dtype("int32"): np.int32, - np.dtype("int64"): np.int64, - np.dtype("uint8"): np.uint8, - np.dtype("uint16"): np.uint16, - np.dtype("uint32"): np.uint32, - np.dtype("uint64"): np.uint64, - np.dtype("float32"): np.float32, - np.dtype("float64"): np.float64, - } - return self._numba_agg_general( sliding_min_max, - dtype_mapping, + executor.identity_dtype_mapping, engine_kwargs, min_periods=min_count, is_max=True, diff --git a/pandas/core/window/rolling.py b/pandas/core/window/rolling.py index 497f029bc14e4..200ff62b590a0 100644 --- a/pandas/core/window/rolling.py +++ b/pandas/core/window/rolling.py @@ -649,20 +649,8 @@ def _numba_apply( # For now, map everything to float to match the Cython impl # even though it is wrong # TODO: Could preserve correct dtypes in future - dtype_mapping: dict[np.dtype, Any] = { - np.dtype("int8"): np.float64, - np.dtype("int16"): np.float64, - np.dtype("int32"): np.float64, - np.dtype("int64"): np.float64, - np.dtype("uint8"): np.float64, - np.dtype("uint16"): np.float64, - np.dtype("uint32"): np.float64, - np.dtype("uint64"): np.float64, - np.dtype("float32"): np.float64, - np.dtype("float64"): np.float64, - np.dtype("complex64"): np.float64, - np.dtype("complex128"): np.float64, - } + # xref #53214 + dtype_mapping = executor.float_dtype_mapping aggregator = executor.generate_shared_aggregator( func, dtype_mapping, **get_jit_arguments(engine_kwargs) ) diff --git a/pandas/tests/groupby/aggregate/test_numba.py b/pandas/tests/groupby/aggregate/test_numba.py index a82c4d0d8ffbc..68fc0a1920bb4 100644 --- a/pandas/tests/groupby/aggregate/test_numba.py +++ b/pandas/tests/groupby/aggregate/test_numba.py @@ -155,8 +155,7 @@ def test_multifunc_numba_vs_cython_frame(agg_kwargs): grouped = data.groupby(0) result = grouped.agg(**agg_kwargs, engine="numba") expected = grouped.agg(**agg_kwargs, engine="cython") - # check_dtype can be removed if GH 44952 is addressed - tm.assert_frame_equal(result, expected, check_dtype=False) + tm.assert_frame_equal(result, expected) @td.skip_if_no("numba") @@ -192,6 +191,7 @@ def test_multifunc_numba_udf_frame(agg_kwargs, expected_func): result = grouped.agg(**agg_kwargs, engine="numba") expected = grouped.agg(expected_func, engine="cython") # check_dtype can be removed if GH 44952 is addressed + # Currently, UDFs still always return float64 while reductions can preserve dtype tm.assert_frame_equal(result, expected, check_dtype=False) From c6d4ffe647ed63560541cffea489ca7a82ea407b Mon Sep 17 00:00:00 2001 From: Thomas Li <47963215+lithomas1@users.noreply.github.com> Date: Mon, 12 Jun 2023 08:21:28 -0700 Subject: [PATCH 07/10] remove commented code --- pandas/core/groupby/groupby.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/pandas/core/groupby/groupby.py b/pandas/core/groupby/groupby.py index a4cd1fddd7b01..a4b0a9c6a7197 100644 --- a/pandas/core/groupby/groupby.py +++ b/pandas/core/groupby/groupby.py @@ -1367,8 +1367,6 @@ def _numba_agg_general( data = self._obj_with_exclusions df = data if data.ndim == 2 else data.to_frame() - # starts, ends, sorted_index, sorted_data = self._numba_prep(df) - sorted_df = df.take(self.grouper._sort_idx, axis=self.axis) sorted_ids = self.grouper._sorted_ids _, _, ngroups = self.grouper.group_info From d05ebdfb1aee22a1a7484c15356ea2c45d046a0d Mon Sep 17 00:00:00 2001 From: Thomas Li <47963215+lithomas1@users.noreply.github.com> Date: Tue, 13 Jun 2023 07:09:11 -0700 Subject: [PATCH 08/10] update whatsnew --- asv_bench/benchmarks/groupby.py | 4 +--- doc/source/whatsnew/v2.1.0.rst | 1 + 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/asv_bench/benchmarks/groupby.py b/asv_bench/benchmarks/groupby.py index 7098a244e3018..273ec4a6cb020 100644 --- a/asv_bench/benchmarks/groupby.py +++ b/asv_bench/benchmarks/groupby.py @@ -4,7 +4,6 @@ import numpy as np -import pandas as pd from pandas import ( NA, Categorical, @@ -627,8 +626,7 @@ def setup(self, dtype, method): super().setup(dtype, method) def time_frame_agg(self, dtype, method): - with pd.option_context("compute.use_numba", True): - self.df.groupby("key").agg(method) + self.df.groupby("key").agg(method, engine="numba") class GroupByCythonAggEaDtypes: diff --git a/doc/source/whatsnew/v2.1.0.rst b/doc/source/whatsnew/v2.1.0.rst index 762f41b4049c2..a6e75e0a4e7b9 100644 --- a/doc/source/whatsnew/v2.1.0.rst +++ b/doc/source/whatsnew/v2.1.0.rst @@ -105,6 +105,7 @@ Other enhancements - :meth:`SeriesGroupby.agg` and :meth:`DataFrameGroupby.agg` now support passing in multiple functions for ``engine="numba"`` (:issue:`53486`) - Added ``engine_kwargs`` parameter to :meth:`DataFrame.to_excel` (:issue:`53220`) - Added a new parameter ``by_row`` to :meth:`Series.apply`. When set to ``False`` the supplied callables will always operate on the whole Series (:issue:`53400`). +- Groupby aggregations (such as :meth:`DataFrameGroupby.sum`) now can preserve the dtype of the input instead of casting to ``float64`` (:issue:`44952`) - Many read/to_* functions, such as :meth:`DataFrame.to_pickle` and :func:`read_csv`, support forwarding compression arguments to lzma.LZMAFile (:issue:`52979`) - Performance improvement in :func:`concat` with homogeneous ``np.float64`` or ``np.float32`` dtypes (:issue:`52685`) - Performance improvement in :meth:`DataFrame.filter` when ``items`` is given (:issue:`52941`) From e67bbebb58edffad8edf8dc5c2ced4ae624fc389 Mon Sep 17 00:00:00 2001 From: Thomas Li <47963215+lithomas1@users.noreply.github.com> Date: Tue, 13 Jun 2023 09:51:43 -0700 Subject: [PATCH 09/10] debug benchmarks --- .github/workflows/code-checks.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/code-checks.yml b/.github/workflows/code-checks.yml index 4ad2fbc71c8c1..bdc11ac693cff 100644 --- a/.github/workflows/code-checks.yml +++ b/.github/workflows/code-checks.yml @@ -143,7 +143,7 @@ jobs: run: | cd asv_bench asv machine --yes - asv run --quick --dry-run --strict --durations=30 --python=same + asv run --quick --dry-run --strict --durations=30 --python=same --show-stderr build_docker_dev_environment: name: Build Docker Dev Environment From b0d22dbd87b599634d2a8c16252cf9ed18c4f172 Mon Sep 17 00:00:00 2001 From: Thomas Li <47963215+lithomas1@users.noreply.github.com> Date: Wed, 14 Jun 2023 16:00:03 -0700 Subject: [PATCH 10/10] Skip min/max benchmarks --- .github/workflows/code-checks.yml | 2 +- asv_bench/benchmarks/groupby.py | 8 ++++++-- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/.github/workflows/code-checks.yml b/.github/workflows/code-checks.yml index 645136474486e..f6c35decfd30b 100644 --- a/.github/workflows/code-checks.yml +++ b/.github/workflows/code-checks.yml @@ -144,7 +144,7 @@ jobs: run: | cd asv_bench asv machine --yes - asv run --quick --dry-run --strict --durations=30 --python=same --show-stderr + asv run --quick --dry-run --strict --durations=30 --python=same build_docker_dev_environment: name: Build Docker Dev Environment diff --git a/asv_bench/benchmarks/groupby.py b/asv_bench/benchmarks/groupby.py index 273ec4a6cb020..6617b3c8b4cca 100644 --- a/asv_bench/benchmarks/groupby.py +++ b/asv_bench/benchmarks/groupby.py @@ -591,8 +591,12 @@ class GroupByCythonAgg: [ "sum", "prod", - "min", - "max", + # TODO: uncomment min/max + # Currently, min/max implemented very inefficiently + # because it re-uses the Window min/max kernel + # so it will time out ASVs + # "min", + # "max", "mean", "median", "var",