diff --git a/python/cudf/cudf/core/dataframe.py b/python/cudf/cudf/core/dataframe.py index 584e92af2e9..60f654f4836 100644 --- a/python/cudf/cudf/core/dataframe.py +++ b/python/cudf/cudf/core/dataframe.py @@ -4866,12 +4866,6 @@ def apply( runtime compilation features """ - # libcudacxx tuples are not compatible with nvrtc 11.0 - runtime = cuda.cudadrv.runtime.Runtime() - mjr, mnr = runtime.get_version() - if mjr < 11 or (mjr == 11 and mnr < 1): - raise RuntimeError("DataFrame.apply requires CUDA 11.1+") - for dtype in self.dtypes: if ( isinstance(dtype, cudf.core.dtypes._BaseDtype) diff --git a/python/cudf/cudf/core/frame.py b/python/cudf/cudf/core/frame.py index 0538b9abf41..b0315b7e8f1 100644 --- a/python/cudf/cudf/core/frame.py +++ b/python/cudf/cudf/core/frame.py @@ -46,6 +46,7 @@ ) from cudf.core.column_accessor import ColumnAccessor from cudf.core.join import merge +from cudf.core.udf.pipeline import compile_or_get from cudf.core.window import Rolling from cudf.utils import ioutils from cudf.utils.docutils import copy_docstring @@ -1455,10 +1456,29 @@ def _apply(self, func): """ Apply `func` across the rows of the frame. """ - output_dtype, ptx = cudf.core.udf.pipeline.compile_masked_udf( - func, self.dtypes + kernel, retty = compile_or_get(self, func) + + # Mask and data column preallocated + ans_col = cupy.empty(len(self), dtype=retty) + ans_mask = cudf.core.column.column_empty(len(self), dtype="bool") + launch_args = [(ans_col, ans_mask)] + offsets = [] + for col in self._data.values(): + data = col.data + mask = col.mask + if mask is None: + launch_args.append(data) + else: + launch_args.append((data, mask)) + offsets.append(col.offset) + launch_args += offsets + launch_args.append(len(self)) # size + kernel.forall(len(self))(*launch_args) + + result = cudf.Series(ans_col).set_mask( + libcudf.transform.bools_to_mask(ans_mask) ) - result = cudf._lib.transform.masked_udf(self, ptx, output_dtype) + return result def rank( diff --git a/python/cudf/cudf/core/udf/classes.py b/python/cudf/cudf/core/udf/api.py similarity index 69% rename from python/cudf/cudf/core/udf/classes.py rename to python/cudf/cudf/core/udf/api.py index fe2fbd9daad..23b4d02c57d 100644 --- a/python/cudf/cudf/core/udf/classes.py +++ b/python/cudf/cudf/core/udf/api.py @@ -14,3 +14,11 @@ class Masked: def __init__(self, value, valid): self.value = value self.valid = valid + + +def pack_return(masked_or_scalar): + # Blank function to give us something for the typing and + # lowering to grab onto. Just a dummy function for us to + # call within kernels that will get replaced later by the + # lowered implementation + pass diff --git a/python/cudf/cudf/core/udf/lowering.py b/python/cudf/cudf/core/udf/lowering.py index 1467a61f215..3986abc2bf0 100644 --- a/python/cudf/cudf/core/udf/lowering.py +++ b/python/cudf/cudf/core/udf/lowering.py @@ -9,11 +9,10 @@ ) from numba.extending import lower_builtin, types +from cudf.core.udf import api +from cudf.core.udf._ops import arith_ops, comparison_ops from cudf.core.udf.typing import MaskedType, NAType -from . import classes -from ._ops import arith_ops, comparison_ops - @cuda_lowering_registry.lower_constant(NAType) def constant_na(context, builder, ty, pyval): @@ -154,9 +153,8 @@ def register_const_op(op): to_lower_op = make_const_op(op) cuda_lower(op, MaskedType, types.Number)(to_lower_op) cuda_lower(op, types.Number, MaskedType)(to_lower_op) - - # to_lower_op_reflected = make_reflected_const_op(op) - # cuda_lower(op, types.Number, MaskedType)(to_lower_op_reflected) + cuda_lower(op, MaskedType, types.Boolean)(to_lower_op) + cuda_lower(op, types.Boolean, MaskedType)(to_lower_op) # register all lowering at init @@ -194,6 +192,24 @@ def masked_scalar_is_null_impl(context, builder, sig, args): return builder.load(result) +# Main kernel always calls `pack_return` on whatever the user defined +# function returned. This returns the same data if its already a `Masked` +# else packs it up into a new one that is valid from the get go +@cuda_lower(api.pack_return, MaskedType) +def pack_return_masked_impl(context, builder, sig, args): + return args[0] + + +@cuda_lower(api.pack_return, types.Boolean) +@cuda_lower(api.pack_return, types.Number) +def pack_return_scalar_impl(context, builder, sig, args): + outdata = cgutils.create_struct_proxy(sig.return_type)(context, builder) + outdata.value = args[0] + outdata.valid = context.get_constant(types.boolean, 1) + + return outdata._getvalue() + + @cuda_lower(operator.truth, MaskedType) def masked_scalar_truth_impl(context, builder, sig, args): indata = cgutils.create_struct_proxy(MaskedType(types.boolean))( @@ -253,7 +269,7 @@ def cast_masked_to_masked(context, builder, fromty, toty, val): # Masked constructor for use in a kernel for testing -@lower_builtin(classes.Masked, types.Number, types.boolean) +@lower_builtin(api.Masked, types.Number, types.boolean) def masked_constructor(context, builder, sig, args): ty = sig.return_type value, valid = args diff --git a/python/cudf/cudf/core/udf/pipeline.py b/python/cudf/cudf/core/udf/pipeline.py index c7b8be92c00..a410a03be41 100644 --- a/python/cudf/cudf/core/udf/pipeline.py +++ b/python/cudf/cudf/core/udf/pipeline.py @@ -1,28 +1,40 @@ +import cachetools +import numpy as np +from numba import cuda from numba.np import numpy_support +from numba.types import Tuple, boolean, int64, void from nvtx import annotate +from cudf.core.udf.api import Masked, pack_return from cudf.core.udf.typing import MaskedType from cudf.utils import cudautils +libcudf_bitmask_type = numpy_support.from_dtype(np.dtype("int32")) +MASK_BITSIZE = np.dtype("int32").itemsize * 8 +precompiled: cachetools.LRUCache = cachetools.LRUCache(maxsize=32) + @annotate("NUMBA JIT", color="green", domain="cudf_python") -def compile_masked_udf(func, dtypes): +def get_udf_return_type(func, dtypes): """ - Generate an inlineable PTX function that will be injected into - a variadic kernel inside libcudf - - assume all input types are `MaskedType(input_col.dtype)` and then - compile the requestied PTX function as a function over those types + Get the return type of a masked UDF for a given set of argument dtypes. It + is assumed that a `MaskedType(dtype)` is passed to the function for each + input dtype. """ to_compiler_sig = tuple( MaskedType(arg) for arg in (numpy_support.from_dtype(np_type) for np_type in dtypes) ) - # Get the inlineable PTX function - ptx, numba_output_type = cudautils.compile_udf(func, to_compiler_sig) - numpy_output_type = numpy_support.as_dtype(numba_output_type.value_type) + # Get the return type. The PTX is also returned by compile_udf, but is not + # needed here. + ptx, output_type = cudautils.compile_udf(func, to_compiler_sig) + + if not isinstance(output_type, MaskedType): + numba_output_type = numpy_support.from_dtype(np.dtype(output_type)) + else: + numba_output_type = output_type - return numpy_output_type, ptx + return numba_output_type def nulludf(func): @@ -50,3 +62,158 @@ def wrapper(*args): return to_udf_table._apply(func) return wrapper + + +def masked_array_type_from_col(col): + """ + Return a type representing a tuple of arrays, + the first element an array of the numba type + corresponding to `dtype`, and the second an + array of bools representing a mask. + """ + nb_scalar_ty = numpy_support.from_dtype(col.dtype) + if col.mask is None: + return nb_scalar_ty[::1] + else: + return Tuple((nb_scalar_ty[::1], libcudf_bitmask_type[::1])) + + +def construct_signature(df, return_type): + """ + Build the signature of numba types that will be used to + actually JIT the kernel itself later, accounting for types + and offsets + """ + + # Tuple of arrays, first the output data array, then the mask + return_type = Tuple((return_type[::1], boolean[::1])) + offsets = [] + sig = [return_type] + for col in df._data.values(): + sig.append(masked_array_type_from_col(col)) + offsets.append(int64) + + # return_type + data,masks + offsets + size + sig = void(*(sig + offsets + [int64])) + + return sig + + +@cuda.jit(device=True) +def mask_get(mask, pos): + return (mask[pos // MASK_BITSIZE] >> (pos % MASK_BITSIZE)) & 1 + + +kernel_template = """\ +def _kernel(retval, {input_columns}, {input_offsets}, size): + i = cuda.grid(1) + ret_data_arr, ret_mask_arr = retval + if i < size: +{masked_input_initializers} + ret = {user_udf_call} + ret_masked = pack_return(ret) + ret_data_arr[i] = ret_masked.value + ret_mask_arr[i] = ret_masked.valid +""" + +unmasked_input_initializer_template = """\ + d_{idx} = input_col_{idx} + masked_{idx} = Masked(d_{idx}[i], True) +""" + +masked_input_initializer_template = """\ + d_{idx}, m_{idx} = input_col_{idx} + masked_{idx} = Masked(d_{idx}[i], mask_get(m_{idx}, i + offset_{idx})) +""" + + +def _define_function(df, scalar_return=False): + # Create argument list for kernel + input_columns = ", ".join([f"input_col_{i}" for i in range(len(df._data))]) + + input_offsets = ", ".join([f"offset_{i}" for i in range(len(df._data))]) + + # Create argument list to pass to device function + args = ", ".join([f"masked_{i}" for i in range(len(df._data))]) + user_udf_call = f"f_({args})" + + # Generate the initializers for each device function argument + initializers = [] + for i, col in enumerate(df._data.values()): + idx = str(i) + if col.mask is not None: + template = masked_input_initializer_template + else: + template = unmasked_input_initializer_template + + initializer = template.format(idx=idx) + + initializers.append(initializer) + + masked_input_initializers = "\n".join(initializers) + + # Incorporate all of the above into the kernel code template + d = { + "input_columns": input_columns, + "input_offsets": input_offsets, + "masked_input_initializers": masked_input_initializers, + "user_udf_call": user_udf_call, + } + + return kernel_template.format(**d) + + +@annotate("UDF COMPILATION", color="darkgreen", domain="cudf_python") +def compile_or_get(df, f): + """ + Return a compiled kernel in terms of MaskedTypes that launches a + kernel equivalent of `f` for the dtypes of `df`. The kernel uses + a thread for each row and calls `f` using that rows data / mask + to produce an output value and output valdity for each row. + + If the UDF has already been compiled for this requested dtypes, + a cached version will be returned instead of running compilation. + + """ + + # check to see if we already compiled this function + cache_key = ( + *cudautils.make_cache_key(f, tuple(df.dtypes)), + *(col.mask is None for col in df._data.values()), + ) + if precompiled.get(cache_key) is not None: + kernel, scalar_return_type = precompiled[cache_key] + return kernel, scalar_return_type + + numba_return_type = get_udf_return_type(f, df.dtypes) + _is_scalar_return = not isinstance(numba_return_type, MaskedType) + scalar_return_type = ( + numba_return_type + if _is_scalar_return + else numba_return_type.value_type + ) + + sig = construct_signature(df, scalar_return_type) + f_ = cuda.jit(device=True)(f) + + # Dict of 'local' variables into which `_kernel` is defined + local_exec_context = {} + global_exec_context = { + "f_": f_, + "cuda": cuda, + "Masked": Masked, + "mask_get": mask_get, + "pack_return": pack_return, + } + exec( + _define_function(df, scalar_return=_is_scalar_return), + global_exec_context, + local_exec_context, + ) + # The python function definition representing the kernel + _kernel = local_exec_context["_kernel"] + kernel = cuda.jit(sig)(_kernel) + scalar_return_type = numpy_support.as_dtype(scalar_return_type) + precompiled[cache_key] = (kernel, scalar_return_type) + + return kernel, scalar_return_type diff --git a/python/cudf/cudf/core/udf/typing.py b/python/cudf/cudf/core/udf/typing.py index 6e026412f24..042d97db838 100644 --- a/python/cudf/cudf/core/udf/typing.py +++ b/python/cudf/cudf/core/udf/typing.py @@ -17,8 +17,8 @@ from numba.cuda.cudadecl import registry as cuda_decl_registry from pandas._libs.missing import NAType as _NAType -from . import classes -from ._ops import arith_ops, comparison_ops +from cudf.core.udf import api +from cudf.core.udf._ops import arith_ops, comparison_ops class MaskedType(types.Type): @@ -101,7 +101,7 @@ def __eq__(self, other): # For typing a Masked constant value defined outside a kernel (e.g. captured in # a closure). -@typeof_impl.register(classes.Masked) +@typeof_impl.register(api.Masked) def typeof_masked(val, c): return MaskedType(typeof(val.value)) @@ -110,7 +110,7 @@ def typeof_masked(val, c): # type in a kernel. @cuda_decl_registry.register class MaskedConstructor(ConcreteTemplate): - key = classes.Masked + key = api.Masked cases = [ nb_signature(MaskedType(t), t, types.boolean) @@ -123,20 +123,20 @@ class MaskedConstructor(ConcreteTemplate): make_attribute_wrapper(MaskedType, "valid", "valid") -# Typing for `classes.Masked` +# Typing for `api.Masked` @cuda_decl_registry.register_attr class ClassesTemplate(AttributeTemplate): - key = types.Module(classes) + key = types.Module(api) def resolve_Masked(self, mod): return types.Function(MaskedConstructor) -# Registration of the global is also needed for Numba to type classes.Masked -cuda_decl_registry.register_global(classes, types.Module(classes)) -# For typing bare Masked (as in `from .classes import Masked` +# Registration of the global is also needed for Numba to type api.Masked +cuda_decl_registry.register_global(api, types.Module(api)) +# For typing bare Masked (as in `from .api import Masked` cuda_decl_registry.register_global( - classes.Masked, types.Function(MaskedConstructor) + api.Masked, types.Function(MaskedConstructor) ) @@ -247,10 +247,10 @@ def generic(self, args, kws): # In the case of op(Masked, scalar), we resolve the type between # the Masked value_type and the scalar's type directly if isinstance(args[0], MaskedType) and isinstance( - args[1], types.Number + args[1], (types.Number, types.Boolean) ): to_resolve_types = (args[0].value_type, args[1]) - elif isinstance(args[0], types.Number) and isinstance( + elif isinstance(args[0], (types.Number, types.Boolean)) and isinstance( args[1], MaskedType ): to_resolve_types = (args[1].value_type, args[0]) @@ -287,6 +287,23 @@ def generic(self, args, kws): return nb_signature(types.boolean, MaskedType(types.boolean)) +@cuda_decl_registry.register_global(api.pack_return) +class UnpackReturnToMasked(AbstractTemplate): + """ + Turn a returned MaskedType into its value and validity + or turn a scalar into the tuple (scalar, True). + """ + + def generic(self, args, kws): + if isinstance(args[0], MaskedType): + # MaskedType(dtype, valid) -> MaskedType(dtype, valid) + return nb_signature(args[0], args[0]) + elif isinstance(args[0], (types.Number, types.Boolean)): + # scalar_type -> MaskedType(scalar_type, True) + return_type = MaskedType(args[0]) + return nb_signature(return_type, args[0]) + + for op in arith_ops + comparison_ops: # Every op shares the same typing class cuda_decl_registry.register_global(op)(MaskedScalarArithOp) diff --git a/python/cudf/cudf/tests/test_extension_compilation.py b/python/cudf/cudf/tests/test_extension_compilation.py index e527fd0af17..39fa7b11ce2 100644 --- a/python/cudf/cudf/tests/test_extension_compilation.py +++ b/python/cudf/cudf/tests/test_extension_compilation.py @@ -5,7 +5,7 @@ from numba.cuda import compile_ptx from cudf import NA -from cudf.core.udf.classes import Masked +from cudf.core.udf.api import Masked from cudf.core.udf.typing import MaskedType arith_ops = ( diff --git a/python/cudf/cudf/tests/test_udf_masked_ops.py b/python/cudf/cudf/tests/test_udf_masked_ops.py index f73f1526c7f..9a48a98fbc2 100644 --- a/python/cudf/cudf/tests/test_udf_masked_ops.py +++ b/python/cudf/cudf/tests/test_udf_masked_ops.py @@ -2,7 +2,6 @@ import pandas as pd import pytest -from numba import cuda import cudf from cudf.core.udf.pipeline import nulludf @@ -15,12 +14,7 @@ operator.truediv, operator.floordiv, operator.mod, - pytest.param( - operator.pow, - marks=pytest.mark.xfail( - reason="https://github.com/rapidsai/cudf/issues/8470" - ), - ), + operator.pow, ] comparison_ops = [ @@ -34,13 +28,6 @@ def run_masked_udf_test(func_pdf, func_gdf, data, **kwargs): - - # Skip testing CUDA 11.0 - runtime = cuda.cudadrv.runtime.Runtime() - mjr, mnr = runtime.get_version() - if mjr < 11 or (mjr == 11 and mnr < 1): - pytest.skip("Skip testing for CUDA 11.0") - gdf = data pdf = data.to_pandas(nullable=True) @@ -91,8 +78,9 @@ def func_gdf(x, y): @pytest.mark.parametrize("op", arith_ops) -@pytest.mark.parametrize("constant", [1, 1.5]) -def test_arith_masked_vs_constant(op, constant): +@pytest.mark.parametrize("constant", [1, 1.5, True, False]) +@pytest.mark.parametrize("data", [[1, 2, cudf.NA]]) +def test_arith_masked_vs_constant(op, constant, data): def func_pdf(x): return op(x, constant) @@ -100,15 +88,28 @@ def func_pdf(x): def func_gdf(x): return op(x, constant) - # Just a single column -> result will be all NA - gdf = cudf.DataFrame({"data": [1, 2, None]}) + gdf = cudf.DataFrame({"data": data}) + if constant is False and op in { + operator.mod, + operator.pow, + operator.truediv, + operator.floordiv, + }: + # The following tests cases yield undefined behavior: + # - truediv(x, False) because its dividing by zero + # - floordiv(x, False) because its dividing by zero + # - mod(x, False) because its mod by zero, + # - pow(x, False) because we have an NA in the series and pandas + # insists that (NA**0 == 1) where we do not + pytest.skip() run_masked_udf_test(func_pdf, func_gdf, gdf, check_dtype=False) @pytest.mark.parametrize("op", arith_ops) -@pytest.mark.parametrize("constant", [1, 1.5]) -def test_arith_masked_vs_constant_reflected(op, constant): +@pytest.mark.parametrize("constant", [1, 1.5, True, False]) +@pytest.mark.parametrize("data", [[2, 3, cudf.NA], [1, cudf.NA, 1]]) +def test_arith_masked_vs_constant_reflected(op, constant, data): def func_pdf(x): return op(constant, x) @@ -117,13 +118,20 @@ def func_gdf(x): return op(constant, x) # Just a single column -> result will be all NA - gdf = cudf.DataFrame({"data": [1, 2, None]}) - + gdf = cudf.DataFrame({"data": data}) + + if constant == 1 and op is operator.pow: + # The following tests cases yield differing results from pandas: + # - 1**NA + # - True**NA + # both due to pandas insisting that this is equal to 1. + pytest.skip() run_masked_udf_test(func_pdf, func_gdf, gdf, check_dtype=False) @pytest.mark.parametrize("op", arith_ops) -def test_arith_masked_vs_null(op): +@pytest.mark.parametrize("data", [[1, cudf.NA, 3], [2, 3, cudf.NA]]) +def test_arith_masked_vs_null(op, data): def func_pdf(x): return op(x, pd.NA) @@ -131,7 +139,11 @@ def func_pdf(x): def func_gdf(x): return op(x, cudf.NA) - gdf = cudf.DataFrame({"data": [1, None, 3]}) + gdf = cudf.DataFrame({"data": data}) + + if 1 in gdf["data"] and op is operator.pow: + # In pandas, 1**NA == 1. + pytest.skip() run_masked_udf_test(func_pdf, func_gdf, gdf, check_dtype=False) @@ -255,6 +267,18 @@ def func_gdf(x): run_masked_udf_test(func_pdf, func_gdf, gdf, check_dtype=False) +def test_apply_return_literal_only(): + def func_pdf(x): + return 5 + + @nulludf + def func_gdf(x): + return 5 + + gdf = cudf.DataFrame({"a": [1, None, 3]}) + run_masked_udf_test(func_pdf, func_gdf, gdf, check_dtype=False) + + def test_apply_everything(): def func_pdf(w, x, y, z): if x is pd.NA: diff --git a/python/cudf/cudf/utils/cudautils.py b/python/cudf/cudf/utils/cudautils.py index 727bbb1c345..7b7fe674210 100755 --- a/python/cudf/cudf/utils/cudautils.py +++ b/python/cudf/cudf/utils/cudautils.py @@ -210,6 +210,16 @@ def grouped_window_sizes_from_offset(arr, group_starts, offset): _udf_code_cache: cachetools.LRUCache = cachetools.LRUCache(maxsize=32) +def make_cache_key(udf, sig): + codebytes = udf.__code__.co_code + if udf.__closure__ is not None: + cvars = tuple([x.cell_contents for x in udf.__closure__]) + cvarbytes = dumps(cvars) + else: + cvarbytes = b"" + return codebytes, cvarbytes, sig + + def compile_udf(udf, type_signature): """Compile ``udf`` with `numba` @@ -244,14 +254,7 @@ def compile_udf(udf, type_signature): # Check if we've already compiled a similar (but possibly distinct) # function before - codebytes = udf.__code__.co_code - if udf.__closure__ is not None: - cvars = tuple([x.cell_contents for x in udf.__closure__]) - cvarbytes = dumps(cvars) - else: - cvarbytes = b"" - - key = (type_signature, codebytes, cvarbytes) + key = make_cache_key(udf, type_signature) res = _udf_code_cache.get(key) if res: return res