Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ENH: Allow numba aggregations to return non-float64 results #53444

Merged
merged 17 commits into from
Jun 15, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 80 additions & 7 deletions asv_bench/benchmarks/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import numpy as np

import pandas as pd
from pandas import (
NA,
Categorical,
Expand Down Expand Up @@ -57,6 +58,38 @@
},
}

# These aggregations don't have a kernel implemented for them yet
_numba_unsupported_methods = [
"all",
"any",
"bfill",
"count",
"cumcount",
"cummax",
"cummin",
"cumprod",
"cumsum",
"describe",
"diff",
"ffill",
"first",
"head",
"last",
"median",
"nunique",
"pct_change",
"prod",
"quantile",
"rank",
"sem",
"shift",
"size",
"skew",
"tail",
"unique",
"value_counts",
]


class ApplyDictReturn:
def setup(self):
Expand Down Expand Up @@ -453,9 +486,11 @@ class GroupByMethods:
],
["direct", "transformation"],
[1, 5],
# ["cython", "numba"],
mroeschke marked this conversation as resolved.
Show resolved Hide resolved
["numba"],
]

def setup(self, dtype, method, application, ncols):
def setup(self, dtype, method, application, ncols, engine):
if method in method_blocklist.get(dtype, {}):
raise NotImplementedError # skip benchmark

Expand All @@ -474,6 +509,19 @@ def setup(self, dtype, method, application, ncols):
# DataFrameGroupBy doesn't have these methods
raise NotImplementedError

# Numba currently doesn't support
# multiple transform functions or strs for transform,
# grouping on multiple columns
# and we lack kernels for a bunch of methods
if (
engine == "numba"
and method in _numba_unsupported_methods
or ncols > 1
or application == "transformation"
or dtype == "datetime"
):
raise NotImplementedError

if method == "describe":
ngroups = 20
elif method == "skew":
Expand Down Expand Up @@ -505,17 +553,30 @@ def setup(self, dtype, method, application, ncols):
if len(cols) == 1:
cols = cols[0]

# Not everything supports the engine keyword yet
kwargs = {}
if engine == "numba":
kwargs["engine"] = engine

if application == "transformation":
self.as_group_method = lambda: df.groupby("key")[cols].transform(method)
self.as_field_method = lambda: df.groupby(cols)["key"].transform(method)
self.as_group_method = lambda: df.groupby("key")[cols].transform(
method, **kwargs
)
self.as_field_method = lambda: df.groupby(cols)["key"].transform(
method, **kwargs
)
else:
self.as_group_method = getattr(df.groupby("key")[cols], method)
self.as_field_method = getattr(df.groupby(cols)["key"], method)
self.as_group_method = partial(
getattr(df.groupby("key")[cols], method), **kwargs
)
self.as_field_method = partial(
getattr(df.groupby(cols)["key"], method), **kwargs
)

def time_dtype_as_group(self, dtype, method, application, ncols):
def time_dtype_as_group(self, dtype, method, application, ncols, engine):
self.as_group_method()

def time_dtype_as_field(self, dtype, method, application, ncols):
def time_dtype_as_field(self, dtype, method, application, ncols, engine):
self.as_field_method()


Expand Down Expand Up @@ -554,6 +615,18 @@ def time_frame_agg(self, dtype, method):
self.df.groupby("key").agg(method)


class GroupByNumbaAgg(GroupByCythonAgg):
"""
Benchmarks specifically targeting our numba aggregation algorithms
(using a big enough dataframe with simple key, so a large part of the
time is actually spent in the grouped aggregation).
"""

def time_frame_agg(self, dtype, method):
with pd.option_context("compute.use_numba", True):
self.df.groupby("key").agg(method)


class GroupByCythonAggEaDtypes:
"""
Benchmarks specifically targeting our cython aggregation algorithms
Expand Down
94 changes: 78 additions & 16 deletions pandas/core/_numba/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,61 @@

if TYPE_CHECKING:
from pandas._typing import Scalar
from typing import Any
lithomas1 marked this conversation as resolved.
Show resolved Hide resolved

import numpy as np

from pandas.compat._optional import import_optional_dependency


@functools.cache
def make_looper(func, result_dtype, nopython, nogil, parallel):
if TYPE_CHECKING:
import numba
mroeschke marked this conversation as resolved.
Show resolved Hide resolved
else:
numba = import_optional_dependency("numba")

@numba.jit(nopython=nopython, nogil=nogil, parallel=parallel)
def column_looper(
values: np.ndarray,
start: np.ndarray,
end: np.ndarray,
min_periods: int,
*args,
):
result = np.empty((values.shape[0], len(start)), dtype=result_dtype)
na_positions = {}
for i in numba.prange(values.shape[0]):
output, na_pos = func(
values[i], result_dtype, start, end, min_periods, *args
)
result[i] = output
if len(na_pos) > 0:
na_positions[i] = np.array(na_pos)
return result, na_positions

return column_looper


default_dtype_mapping: dict[np.dtype, Any] = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious, could we not just define signatures for numba.jit to use when running the function?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We allocate arrays inside the function and need to pass a dtype there as well.

Not sure how to access the signature from inside the func.

np.dtype("int8"): np.int64,
np.dtype("int16"): np.int64,
np.dtype("int32"): np.int64,
np.dtype("int64"): np.int64,
np.dtype("uint8"): np.uint64,
np.dtype("uint16"): np.uint64,
np.dtype("uint32"): np.uint64,
np.dtype("uint64"): np.uint64,
np.dtype("float32"): np.float64,
np.dtype("float64"): np.float64,
np.dtype("complex64"): np.complex64,
np.dtype("complex128"): np.complex128,
}


def generate_shared_aggregator(
func: Callable[..., Scalar],
dtype_mapping: dict[np.dtype, np.dtype],
nopython: bool,
nogil: bool,
parallel: bool,
Expand All @@ -29,6 +75,9 @@ def generate_shared_aggregator(
----------
func : function
aggregation function to be applied to each column
dtype_mapping: dict or None
If not None, maps a dtype to a result dtype.
Otherwise, will fall back to default mapping.
nopython : bool
nopython to be passed into numba.jit
nogil : bool
Expand All @@ -40,22 +89,35 @@ def generate_shared_aggregator(
-------
Numba function
"""
if TYPE_CHECKING:
import numba
else:
numba = import_optional_dependency("numba")

@numba.jit(nopython=nopython, nogil=nogil, parallel=parallel)
def column_looper(
values: np.ndarray,
start: np.ndarray,
end: np.ndarray,
min_periods: int,
*args,
):
result = np.empty((len(start), values.shape[1]), dtype=np.float64)
for i in numba.prange(values.shape[1]):
result[:, i] = func(values[:, i], start, end, min_periods, *args)
# A wrapper around the looper function,
# to dispatch based on dtype since numba is unable to do that in nopython mode

# It also post-processes the values by inserting nans where number of observations
# is less than min_periods
# Cannot do this in numba nopython mode
# (you'll run into type-unification error when you cast int -> float)
def looper_wrapper(values, start, end, min_periods, **kwargs):
result_dtype = dtype_mapping[values.dtype]
column_looper = make_looper(func, result_dtype, nopython, nogil, parallel)
# Need to unpack kwargs since numba only supports *args
result, na_positions = column_looper(
values, start, end, min_periods, *kwargs.values()
)
if result.dtype.kind == "i":
# Look if na_positions is not empty
# If so, convert the whole block
# This is OK since int dtype cannot hold nan,
# so if min_periods not satisfied for 1 col, it is not satisfied for
# all columns at that index
for na_pos in na_positions.values():
if len(na_pos) > 0:
result = result.astype("float64")
break
# TODO: Optimize this
for i, na_pos in na_positions.items():
if len(na_pos) > 0:
result[i, na_pos] = np.nan
rhshadrach marked this conversation as resolved.
Show resolved Hide resolved
return result

return column_looper
return looper_wrapper
11 changes: 8 additions & 3 deletions pandas/core/_numba/kernels/mean_.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,11 @@ def remove_mean(
@numba.jit(nopython=True, nogil=True, parallel=False)
def sliding_mean(
values: np.ndarray,
result_dtype: np.dtype,
start: np.ndarray,
end: np.ndarray,
min_periods: int,
) -> np.ndarray:
) -> tuple[np.ndarray, list[int]]:
N = len(start)
nobs = 0
sum_x = 0.0
Expand All @@ -75,7 +76,7 @@ def sliding_mean(
start
) and is_monotonic_increasing(end)

output = np.empty(N, dtype=np.float64)
output = np.empty(N, dtype=result_dtype)

for i in range(N):
s = start[i]
Expand Down Expand Up @@ -147,4 +148,8 @@ def sliding_mean(
neg_ct = 0
compensation_remove = 0.0

return output
# na_position is empty list since float64 can already hold nans
# Do list comprehension, since numba cannot figure out that na_pos is
# empty list of ints on its own
na_pos = [0 for i in range(0)]
return output, na_pos
13 changes: 9 additions & 4 deletions pandas/core/_numba/kernels/min_max_.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,16 @@
@numba.jit(nopython=True, nogil=True, parallel=False)
def sliding_min_max(
values: np.ndarray,
result_dtype: np.dtype,
start: np.ndarray,
end: np.ndarray,
min_periods: int,
is_max: bool,
) -> np.ndarray:
) -> tuple[np.ndarray, list[int]]:
N = len(start)
nobs = 0
output = np.empty(N, dtype=np.float64)
output = np.empty(N, dtype=result_dtype)
na_pos = []
# Use deque once numba supports it
# https://github.com/numba/numba/issues/7417
Q: list = []
Expand Down Expand Up @@ -64,6 +66,9 @@ def sliding_min_max(
if Q and curr_win_size > 0 and nobs >= min_periods:
output[i] = values[Q[0]]
else:
output[i] = np.nan
if values.dtype.kind != "i":
output[i] = np.nan
else:
na_pos.append(i)

return output
return output, na_pos
Loading