From 8d98b73b62aab7f78f7ffe88e069a62b435eb54a Mon Sep 17 00:00:00 2001 From: brandon-b-miller Date: Mon, 30 Aug 2021 11:43:31 -0700 Subject: [PATCH 01/34] impl --- python/cudf/cudf/core/frame.py | 5 +- python/cudf/cudf/core/udf/pipeline.py | 92 ++++++++++++++++++++++++++- 2 files changed, 92 insertions(+), 5 deletions(-) diff --git a/python/cudf/cudf/core/frame.py b/python/cudf/cudf/core/frame.py index 4f46794aa3f..f7e7a9cfc8c 100644 --- a/python/cudf/cudf/core/frame.py +++ b/python/cudf/cudf/core/frame.py @@ -1545,10 +1545,7 @@ def _apply(self, func): """ Apply `func` across the rows of the frame. """ - output_dtype, ptx = cudf.core.udf.pipeline.compile_masked_udf( - func, self.dtypes - ) - result = cudf._lib.transform.masked_udf(self, ptx, output_dtype) + result = cudf.core.udf.pipeline.udf_pipeline(self, func) return result def rank( diff --git a/python/cudf/cudf/core/udf/pipeline.py b/python/cudf/cudf/core/udf/pipeline.py index c7b8be92c00..195af3a10e1 100644 --- a/python/cudf/cudf/core/udf/pipeline.py +++ b/python/cudf/cudf/core/udf/pipeline.py @@ -2,8 +2,12 @@ from nvtx import annotate from cudf.core.udf.typing import MaskedType +from cudf.core.udf.classes import Masked from cudf.utils import cudautils - +from numba import cuda +import cupy +import cudf +from cudf._lib.transform import bools_to_mask @annotate("NUMBA JIT", color="green", domain="cudf_python") def compile_masked_udf(func, dtypes): @@ -50,3 +54,89 @@ def wrapper(*args): return to_udf_table._apply(func) return wrapper + +from numba.types import int32, boolean, void, Tuple +import numpy as np +libcudf_bitmask_type = numpy_support.from_dtype(np.dtype('int32')) + +def masked_arrty_from_np_type(dtype): + """ + Return a type representing a tuple of arrays, + the first element an array of the numba type + corresponding to `dtype`, and the second an + array of bools represe + """ + nb_scalar_ty = numpy_support.from_dtype(dtype) + return Tuple( + ( + nb_scalar_ty[::1], + libcudf_bitmask_type[::1] + ) + ) + +mask_bitsize = np.dtype('int32').itemsize * 8 + +@cuda.jit(device=True) +def mask_get(mask, pos): + return (mask[pos // mask_bitsize] >> (pos % mask_bitsize)) & 1 + + +def _define_function(df): + + num_vars = len(df.dtypes) + + start = "def _kernel(retval, " + + sig = ", ".join(["input_col_" + str(i) for i in range(num_vars)]) + start += (sig + "):\n") + + start += "\ti = cuda.grid(1)\n" + start += "\tret_data_arr, ret_mask_arr = retval\n" + + + fargs = [] + for i in range(num_vars): + ii = str(i) + start += "\td_"+ii+","+"m_"+ii+"=input_col_"+ii+"\n" + arg = "masked_"+ii + start += "\t"+arg+"="+"Masked("+"d_"+ii+"[i]"+","+"mask_get(m_"+ii+","+"i)"+")\n" + fargs.append(arg) + + fargs = "(" + ",".join(fargs) + ")\n" + start += "\tret = f_"+fargs+"\n" + + start += "\tret_data_arr[i] = ret.value\n" + start += "\tret_mask_arr[i] = ret.valid\n" + #start += "\tret_mask_arr[i] = True\n" + + return start + +def udf_pipeline(df, f): + retty = compile_masked_udf(f, df.dtypes)[0] + + return_type = Tuple( + (numpy_support.from_dtype(retty)[::1], boolean[::1]) + ) + sig = void(return_type, *[masked_arrty_from_np_type(dtype) for dtype in df.dtypes]) + global f_ + + f_ = cuda.jit(device=True)(f) + # Set f_launch into the global namespace + exec(_define_function(df), globals()) + # compile + kernel = cuda.jit(sig)(_kernel) + ans_col = cupy.empty(len(df), dtype=retty) + ans_mask = cudf.core.column.column_empty(len(df), dtype='bool') + + launch_args = [(ans_col, ans_mask)] + for col in df.columns: + data = df[col]._column.data + mask = df[col]._column.mask + if mask is None: + # FIXME - this should work if there isnt a mask + mask = bools_to_mask(cudf.core.column.as_column(cupy.ones(len(df), dtype='bool'))) + launch_args.append((data, mask)) + + kernel[1, len(df)](*launch_args) + result = cudf.Series(ans_col).set_mask(bools_to_mask(ans_mask)) + return result From 45c47ee53e51e80d83e4d5bb1daa512eaa7c639a Mon Sep 17 00:00:00 2001 From: brandon-b-miller Date: Mon, 30 Aug 2021 12:31:27 -0700 Subject: [PATCH 02/34] purge c++ code --- .../Modules/JitifyPreprocessKernels.cmake | 1 - cpp/include/cudf/transform.hpp | 6 -- cpp/src/transform/jit/masked_udf_kernel.cu | 85 ---------------- cpp/src/transform/transform.cpp | 98 ------------------- python/cudf/cudf/_lib/cpp/transform.pxd | 6 -- python/cudf/cudf/_lib/transform.pyx | 21 ---- python/cudf/cudf/core/udf/pipeline.py | 1 - 7 files changed, 218 deletions(-) delete mode 100644 cpp/src/transform/jit/masked_udf_kernel.cu diff --git a/cpp/cmake/Modules/JitifyPreprocessKernels.cmake b/cpp/cmake/Modules/JitifyPreprocessKernels.cmake index 7e2ec5254d3..eb1ade61440 100644 --- a/cpp/cmake/Modules/JitifyPreprocessKernels.cmake +++ b/cpp/cmake/Modules/JitifyPreprocessKernels.cmake @@ -56,7 +56,6 @@ endfunction() jit_preprocess_files(SOURCE_DIRECTORY ${CUDF_SOURCE_DIR}/src FILES binaryop/jit/kernel.cu - transform/jit/masked_udf_kernel.cu transform/jit/kernel.cu rolling/jit/kernel.cu ) diff --git a/cpp/include/cudf/transform.hpp b/cpp/include/cudf/transform.hpp index af2858d948e..f0c69549b0a 100644 --- a/cpp/include/cudf/transform.hpp +++ b/cpp/include/cudf/transform.hpp @@ -54,12 +54,6 @@ std::unique_ptr transform( bool is_ptx, rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()); -std::unique_ptr generalized_masked_op( - table_view const& data_view, - std::string const& binary_udf, - data_type output_type, - rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()); - /** * @brief Creates a null_mask from `input` by converting `NaN` to null and * preserving existing null values and also returns new null_count. diff --git a/cpp/src/transform/jit/masked_udf_kernel.cu b/cpp/src/transform/jit/masked_udf_kernel.cu deleted file mode 100644 index 319ad730c53..00000000000 --- a/cpp/src/transform/jit/masked_udf_kernel.cu +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Copyright (c) 2021, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include -#include -#include - -#include -#include - -#include -#include -#include -#include -#include - -namespace cudf { -namespace transformation { -namespace jit { - -template -struct Masked { - T value; - bool valid; -}; - -template -__device__ auto make_args(cudf::size_type id, TypeIn in_ptr, MaskType in_mask, OffsetType in_offset) -{ - bool valid = in_mask ? cudf::bit_is_set(in_mask, in_offset + id) : true; - return cuda::std::make_tuple(in_ptr[id], valid); -} - -template -__device__ auto make_args(cudf::size_type id, - InType in_ptr, - MaskType in_mask, // in practice, always cudf::bitmask_type const* - OffsetType in_offset, // in practice, always cudf::size_type - Arguments... args) -{ - bool valid = in_mask ? cudf::bit_is_set(in_mask, in_offset + id) : true; - return cuda::std::tuple_cat(cuda::std::make_tuple(in_ptr[id], valid), make_args(id, args...)); -} - -template -__global__ void generic_udf_kernel(cudf::size_type size, - TypeOut* out_data, - bool* out_mask, - Arguments... args) -{ - int const tid = threadIdx.x; - int const blkid = blockIdx.x; - int const blksz = blockDim.x; - int const gridsz = gridDim.x; - int const start = tid + blkid * blksz; - int const step = blksz * gridsz; - - Masked output; - for (cudf::size_type i = start; i < size; i += step) { - auto func_args = cuda::std::tuple_cat( - cuda::std::make_tuple(&output.value), - make_args(i, args...) // passed int64*, bool*, int64, int64*, bool*, int64 - ); - cuda::std::apply(GENERIC_OP, func_args); - out_data[i] = output.value; - out_mask[i] = output.valid; - } -} - -} // namespace jit -} // namespace transformation -} // namespace cudf diff --git a/cpp/src/transform/transform.cpp b/cpp/src/transform/transform.cpp index 5230b853a79..d62a5e7076c 100644 --- a/cpp/src/transform/transform.cpp +++ b/cpp/src/transform/transform.cpp @@ -65,79 +65,6 @@ void unary_operation(mutable_column_view output, cudf::jit::get_data_ptr(input)); } -std::vector make_template_types(column_view outcol_view, table_view const& data_view) -{ - std::string mskptr_type = - cudf::jit::get_type_name(cudf::data_type(cudf::type_to_id())) + "*"; - std::string offset_type = - cudf::jit::get_type_name(cudf::data_type(cudf::type_to_id())); - - std::vector template_types; - template_types.reserve((3 * data_view.num_columns()) + 1); - - template_types.push_back(cudf::jit::get_type_name(outcol_view.type())); - for (auto const& col : data_view) { - template_types.push_back(cudf::jit::get_type_name(col.type()) + "*"); - template_types.push_back(mskptr_type); - template_types.push_back(offset_type); - } - return template_types; -} - -void generalized_operation(table_view const& data_view, - std::string const& udf, - data_type output_type, - mutable_column_view outcol_view, - mutable_column_view outmsk_view, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr) -{ - auto const template_types = make_template_types(outcol_view, data_view); - - std::string generic_kernel_name = - jitify2::reflection::Template("cudf::transformation::jit::generic_udf_kernel") - .instantiate(template_types); - - std::string generic_cuda_source = cudf::jit::parse_single_function_ptx( - udf, "GENERIC_OP", cudf::jit::get_type_name(output_type), {0}); - - std::vector kernel_args; - kernel_args.reserve((data_view.num_columns() * 3) + 3); - - cudf::size_type size = outcol_view.size(); - const void* outcol_ptr = cudf::jit::get_data_ptr(outcol_view); - const void* outmsk_ptr = cudf::jit::get_data_ptr(outmsk_view); - kernel_args.insert(kernel_args.begin(), {&size, &outcol_ptr, &outmsk_ptr}); - - std::vector data_ptrs; - std::vector mask_ptrs; - std::vector offsets; - - data_ptrs.reserve(data_view.num_columns()); - mask_ptrs.reserve(data_view.num_columns()); - offsets.reserve(data_view.num_columns()); - - auto const iters = thrust::make_zip_iterator( - thrust::make_tuple(data_ptrs.begin(), mask_ptrs.begin(), offsets.begin())); - - std::for_each(iters, iters + data_view.num_columns(), [&](auto const& tuple_vals) { - kernel_args.push_back(&thrust::get<0>(tuple_vals)); - kernel_args.push_back(&thrust::get<1>(tuple_vals)); - kernel_args.push_back(&thrust::get<2>(tuple_vals)); - }); - - std::transform(data_view.begin(), data_view.end(), iters, [&](column_view const& col) { - return thrust::make_tuple(cudf::jit::get_data_ptr(col), col.null_mask(), col.offset()); - }); - - cudf::jit::get_program_cache(*transform_jit_masked_udf_kernel_cu_jit) - .get_kernel(generic_kernel_name, - {}, - {{"transform/jit/operation-udf.hpp", generic_cuda_source}}, - {"-arch=sm_."}) - ->configure_1d_max_occupancy(0, 0, 0, stream.value()) - ->launch(kernel_args.data()); -} } // namespace jit } // namespace transformation @@ -165,24 +92,6 @@ std::unique_ptr transform(column_view const& input, return output; } -std::unique_ptr generalized_masked_op(table_view const& data_view, - std::string const& udf, - data_type output_type, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr) -{ - std::unique_ptr output = make_fixed_width_column(output_type, data_view.num_rows()); - std::unique_ptr output_mask = - make_fixed_width_column(cudf::data_type{cudf::type_id::BOOL8}, data_view.num_rows()); - - transformation::jit::generalized_operation( - data_view, udf, output_type, *output, *output_mask, stream, mr); - - auto final_output_mask = cudf::bools_to_mask(*output_mask); - output.get()->set_null_mask(std::move(*(final_output_mask.first))); - return output; -} - } // namespace detail std::unique_ptr transform(column_view const& input, @@ -195,12 +104,5 @@ std::unique_ptr transform(column_view const& input, return detail::transform(input, unary_udf, output_type, is_ptx, rmm::cuda_stream_default, mr); } -std::unique_ptr generalized_masked_op(table_view const& data_view, - std::string const& udf, - data_type output_type, - rmm::mr::device_memory_resource* mr) -{ - return detail::generalized_masked_op(data_view, udf, output_type, rmm::cuda_stream_default, mr); -} } // namespace cudf diff --git a/python/cudf/cudf/_lib/cpp/transform.pxd b/python/cudf/cudf/_lib/cpp/transform.pxd index 907a85ed593..484e3997f34 100644 --- a/python/cudf/cudf/_lib/cpp/transform.pxd +++ b/python/cudf/cudf/_lib/cpp/transform.pxd @@ -34,12 +34,6 @@ cdef extern from "cudf/transform.hpp" namespace "cudf" nogil: bool is_ptx ) except + - cdef unique_ptr[column] generalized_masked_op( - const table_view& data_view, - string udf, - data_type output_type, - ) except + - cdef pair[unique_ptr[table], unique_ptr[column]] encode( table_view input ) except + diff --git a/python/cudf/cudf/_lib/transform.pyx b/python/cudf/cudf/_lib/transform.pyx index 9fada59640e..ce8e6519af8 100644 --- a/python/cudf/cudf/_lib/transform.pyx +++ b/python/cudf/cudf/_lib/transform.pyx @@ -124,27 +124,6 @@ def transform(Column input, op): return Column.from_unique_ptr(move(c_output)) -def masked_udf(Table incols, op, output_type): - cdef table_view data_view = incols.data_view() - cdef string c_str = op.encode("UTF-8") - cdef type_id c_tid - cdef data_type c_dtype - - c_tid = ( - np_to_cudf_types[output_type] - ) - c_dtype = data_type(c_tid) - - with nogil: - c_output = move(libcudf_transform.generalized_masked_op( - data_view, - c_str, - c_dtype, - )) - - return Column.from_unique_ptr(move(c_output)) - - def table_encode(Table input): cdef table_view c_input = input.data_view() cdef pair[unique_ptr[table], unique_ptr[column]] c_result diff --git a/python/cudf/cudf/core/udf/pipeline.py b/python/cudf/cudf/core/udf/pipeline.py index 195af3a10e1..0dcc37c3c54 100644 --- a/python/cudf/cudf/core/udf/pipeline.py +++ b/python/cudf/cudf/core/udf/pipeline.py @@ -107,7 +107,6 @@ def _define_function(df): start += "\tret_data_arr[i] = ret.value\n" start += "\tret_mask_arr[i] = ret.valid\n" - #start += "\tret_mask_arr[i] = True\n" return start From c2e02182db068d8abb7ec3a0ff252ad3954c0108 Mon Sep 17 00:00:00 2001 From: brandon-b-miller Date: Mon, 30 Aug 2021 12:34:54 -0700 Subject: [PATCH 03/34] enable cuda 11.0 --- python/cudf/cudf/core/dataframe.py | 6 ------ python/cudf/cudf/tests/test_udf_masked_ops.py | 7 ------- 2 files changed, 13 deletions(-) diff --git a/python/cudf/cudf/core/dataframe.py b/python/cudf/cudf/core/dataframe.py index 721ebf22de7..8698f747650 100644 --- a/python/cudf/cudf/core/dataframe.py +++ b/python/cudf/cudf/core/dataframe.py @@ -4757,12 +4757,6 @@ def apply(self, func, axis=1): runtime compilation features """ - # libcudacxx tuples are not compatible with nvrtc 11.0 - runtime = cuda.cudadrv.runtime.Runtime() - mjr, mnr = runtime.get_version() - if mjr < 11 or (mjr == 11 and mnr < 1): - raise RuntimeError("DataFrame.apply requires CUDA 11.1+") - for dtype in self.dtypes: if ( isinstance(dtype, cudf.core.dtypes._BaseDtype) diff --git a/python/cudf/cudf/tests/test_udf_masked_ops.py b/python/cudf/cudf/tests/test_udf_masked_ops.py index f73f1526c7f..42c94db71d3 100644 --- a/python/cudf/cudf/tests/test_udf_masked_ops.py +++ b/python/cudf/cudf/tests/test_udf_masked_ops.py @@ -34,13 +34,6 @@ def run_masked_udf_test(func_pdf, func_gdf, data, **kwargs): - - # Skip testing CUDA 11.0 - runtime = cuda.cudadrv.runtime.Runtime() - mjr, mnr = runtime.get_version() - if mjr < 11 or (mjr == 11 and mnr < 1): - pytest.skip("Skip testing for CUDA 11.0") - gdf = data pdf = data.to_pandas(nullable=True) From 69f92cf98d7a4f58355e26c55e97173dc62f4c3d Mon Sep 17 00:00:00 2001 From: brandon-b-miller Date: Mon, 30 Aug 2021 12:37:37 -0700 Subject: [PATCH 04/34] enable tests for __pow__ --- python/cudf/cudf/tests/test_udf_masked_ops.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/python/cudf/cudf/tests/test_udf_masked_ops.py b/python/cudf/cudf/tests/test_udf_masked_ops.py index 42c94db71d3..7e83daf9c5a 100644 --- a/python/cudf/cudf/tests/test_udf_masked_ops.py +++ b/python/cudf/cudf/tests/test_udf_masked_ops.py @@ -15,12 +15,7 @@ operator.truediv, operator.floordiv, operator.mod, - pytest.param( - operator.pow, - marks=pytest.mark.xfail( - reason="https://github.com/rapidsai/cudf/issues/8470" - ), - ), + operator.pow ] comparison_ops = [ From b636af9156a0f3e0cfb86d2b17bae362ff278e65 Mon Sep 17 00:00:00 2001 From: brandon-b-miller Date: Tue, 31 Aug 2021 07:04:49 -0700 Subject: [PATCH 05/34] solve multiple problems --- python/cudf/cudf/core/udf/pipeline.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/python/cudf/cudf/core/udf/pipeline.py b/python/cudf/cudf/core/udf/pipeline.py index 0dcc37c3c54..a15c2aab85d 100644 --- a/python/cudf/cudf/core/udf/pipeline.py +++ b/python/cudf/cudf/core/udf/pipeline.py @@ -117,12 +117,14 @@ def udf_pipeline(df, f): (numpy_support.from_dtype(retty)[::1], boolean[::1]) ) sig = void(return_type, *[masked_arrty_from_np_type(dtype) for dtype in df.dtypes]) - global f_ f_ = cuda.jit(device=True)(f) # Set f_launch into the global namespace - exec(_define_function(df), globals()) + lcl = {} + exec(_define_function(df), {'f_': f_, 'cuda': cuda, "Masked": Masked, "mask_get": mask_get}, lcl) + _kernel = lcl['_kernel'] # compile + breakpoint() kernel = cuda.jit(sig)(_kernel) ans_col = cupy.empty(len(df), dtype=retty) ans_mask = cudf.core.column.column_empty(len(df), dtype='bool') @@ -136,6 +138,6 @@ def udf_pipeline(df, f): mask = bools_to_mask(cudf.core.column.as_column(cupy.ones(len(df), dtype='bool'))) launch_args.append((data, mask)) - kernel[1, len(df)](*launch_args) + kernel.forall(len(df))(*launch_args, len(df)) result = cudf.Series(ans_col).set_mask(bools_to_mask(ans_mask)) return result From bdf58230034c592f1b22ef9930c2a8a3174a6556 Mon Sep 17 00:00:00 2001 From: brandon-b-miller Date: Thu, 2 Sep 2021 13:14:11 -0700 Subject: [PATCH 06/34] masks are required for entry --- python/cudf/cudf/core/udf/pipeline.py | 23 ++++++++++------------- 1 file changed, 10 insertions(+), 13 deletions(-) diff --git a/python/cudf/cudf/core/udf/pipeline.py b/python/cudf/cudf/core/udf/pipeline.py index a15c2aab85d..f3fb94c1da8 100644 --- a/python/cudf/cudf/core/udf/pipeline.py +++ b/python/cudf/cudf/core/udf/pipeline.py @@ -82,26 +82,26 @@ def mask_get(mask, pos): def _define_function(df): - - num_vars = len(df.dtypes) - + start = "def _kernel(retval, " - sig = ", ".join(["input_col_" + str(i) for i in range(num_vars)]) + sig = ", ".join(["input_col_" + str(i) for i in range(len(df.columns))]) start += (sig + "):\n") start += "\ti = cuda.grid(1)\n" start += "\tret_data_arr, ret_mask_arr = retval\n" - fargs = [] - for i in range(num_vars): + for i, col in enumerate(df._data.values()): ii = str(i) start += "\td_"+ii+","+"m_"+ii+"=input_col_"+ii+"\n" arg = "masked_"+ii - start += "\t"+arg+"="+"Masked("+"d_"+ii+"[i]"+","+"mask_get(m_"+ii+","+"i)"+")\n" + if col.mask is not None: + start += "\t"+arg+"="+"Masked("+"d_"+ii+"[i]"+","+"mask_get(m_"+ii+","+"i)"+")\n" + else: + start += "\t"+arg+"="+"Masked("+"d_"+ii+"[i]"+","+"True"+")\n" fargs.append(arg) - + fargs = "(" + ",".join(fargs) + ")\n" start += "\tret = f_"+fargs+"\n" @@ -124,20 +124,17 @@ def udf_pipeline(df, f): exec(_define_function(df), {'f_': f_, 'cuda': cuda, "Masked": Masked, "mask_get": mask_get}, lcl) _kernel = lcl['_kernel'] # compile - breakpoint() kernel = cuda.jit(sig)(_kernel) ans_col = cupy.empty(len(df), dtype=retty) ans_mask = cudf.core.column.column_empty(len(df), dtype='bool') - launch_args = [(ans_col, ans_mask)] for col in df.columns: data = df[col]._column.data mask = df[col]._column.mask if mask is None: - # FIXME - this should work if there isnt a mask - mask = bools_to_mask(cudf.core.column.as_column(cupy.ones(len(df), dtype='bool'))) + mask = cudf.core.buffer.Buffer() launch_args.append((data, mask)) - kernel.forall(len(df))(*launch_args, len(df)) + kernel[1, len(df)](*launch_args) result = cudf.Series(ans_col).set_mask(bools_to_mask(ans_mask)) return result From 470d25e711d36103e1dea355310c23605f2ba2c8 Mon Sep 17 00:00:00 2001 From: brandon-b-miller Date: Thu, 2 Sep 2021 19:56:37 -0700 Subject: [PATCH 07/34] support returning a single number --- python/cudf/cudf/core/udf/pipeline.py | 15 ++++++++++----- python/cudf/cudf/tests/test_udf_masked_ops.py | 11 +++++++++++ 2 files changed, 21 insertions(+), 5 deletions(-) diff --git a/python/cudf/cudf/core/udf/pipeline.py b/python/cudf/cudf/core/udf/pipeline.py index f3fb94c1da8..a79c2089dea 100644 --- a/python/cudf/cudf/core/udf/pipeline.py +++ b/python/cudf/cudf/core/udf/pipeline.py @@ -24,7 +24,7 @@ def compile_masked_udf(func, dtypes): ) # Get the inlineable PTX function ptx, numba_output_type = cudautils.compile_udf(func, to_compiler_sig) - numpy_output_type = numpy_support.as_dtype(numba_output_type.value_type) + numpy_output_type = numpy_support.as_dtype(numba_output_type.value_type) if isinstance(numba_output_type, MaskedType) else np.dtype(numba_output_type) return numpy_output_type, ptx @@ -81,7 +81,7 @@ def mask_get(mask, pos): return (mask[pos // mask_bitsize] >> (pos % mask_bitsize)) & 1 -def _define_function(df): +def _define_function(df, scalar_return=False): start = "def _kernel(retval, " @@ -105,13 +105,18 @@ def _define_function(df): fargs = "(" + ",".join(fargs) + ")\n" start += "\tret = f_"+fargs+"\n" - start += "\tret_data_arr[i] = ret.value\n" - start += "\tret_mask_arr[i] = ret.valid\n" + if scalar_return: + start += "\tret_data_arr[i] = ret\n" + start += "\tret_mask_arr[i] = True\n" + else: + start += "\tret_data_arr[i] = ret.value\n" + start += "\tret_mask_arr[i] = ret.valid\n" return start def udf_pipeline(df, f): retty = compile_masked_udf(f, df.dtypes)[0] + _is_scalar_return = not isinstance(retty, MaskedType) return_type = Tuple( (numpy_support.from_dtype(retty)[::1], boolean[::1]) @@ -121,7 +126,7 @@ def udf_pipeline(df, f): f_ = cuda.jit(device=True)(f) # Set f_launch into the global namespace lcl = {} - exec(_define_function(df), {'f_': f_, 'cuda': cuda, "Masked": Masked, "mask_get": mask_get}, lcl) + exec(_define_function(df, scalar_return=_is_scalar_return), {'f_': f_, 'cuda': cuda, "Masked": Masked, "mask_get": mask_get}, lcl) _kernel = lcl['_kernel'] # compile kernel = cuda.jit(sig)(_kernel) diff --git a/python/cudf/cudf/tests/test_udf_masked_ops.py b/python/cudf/cudf/tests/test_udf_masked_ops.py index 7e83daf9c5a..b0e3a785bf5 100644 --- a/python/cudf/cudf/tests/test_udf_masked_ops.py +++ b/python/cudf/cudf/tests/test_udf_masked_ops.py @@ -242,6 +242,17 @@ def func_gdf(x): gdf = cudf.DataFrame({"a": [1, 3, 6]}) run_masked_udf_test(func_pdf, func_gdf, gdf, check_dtype=False) +def test_apply_return_literal_only(): + def func_pdf(x): + return 5 + + @nulludf + def func_gdf(x): + return 5 + + gdf = cudf.DataFrame({'a': [1, None, 3]}) + run_masked_udf_test(func_pdf, func_gdf, gdf, check_dtype=False) + def test_apply_everything(): def func_pdf(w, x, y, z): From e271ce452f9f314976829061d99e30a5cca0f9dc Mon Sep 17 00:00:00 2001 From: brandon-b-miller Date: Fri, 3 Sep 2021 07:23:21 -0700 Subject: [PATCH 08/34] formatting --- python/cudf/cudf/core/udf/pipeline.py | 101 ++++++++++++------ python/cudf/cudf/tests/test_udf_masked_ops.py | 6 +- 2 files changed, 69 insertions(+), 38 deletions(-) diff --git a/python/cudf/cudf/core/udf/pipeline.py b/python/cudf/cudf/core/udf/pipeline.py index a79c2089dea..486985e7cf8 100644 --- a/python/cudf/cudf/core/udf/pipeline.py +++ b/python/cudf/cudf/core/udf/pipeline.py @@ -1,13 +1,19 @@ +import cupy +import numpy as np +from numba import cuda from numba.np import numpy_support +from numba.types import Tuple, boolean, void from nvtx import annotate -from cudf.core.udf.typing import MaskedType -from cudf.core.udf.classes import Masked -from cudf.utils import cudautils -from numba import cuda -import cupy import cudf from cudf._lib.transform import bools_to_mask +from cudf.core.udf.classes import Masked +from cudf.core.udf.typing import MaskedType +from cudf.utils import cudautils + +libcudf_bitmask_type = numpy_support.from_dtype(np.dtype("int32")) +mask_bitsize = np.dtype("int32").itemsize * 8 + @annotate("NUMBA JIT", color="green", domain="cudf_python") def compile_masked_udf(func, dtypes): @@ -24,7 +30,11 @@ def compile_masked_udf(func, dtypes): ) # Get the inlineable PTX function ptx, numba_output_type = cudautils.compile_udf(func, to_compiler_sig) - numpy_output_type = numpy_support.as_dtype(numba_output_type.value_type) if isinstance(numba_output_type, MaskedType) else np.dtype(numba_output_type) + numpy_output_type = ( + numpy_support.as_dtype(numba_output_type.value_type) + if isinstance(numba_output_type, MaskedType) + else np.dtype(numba_output_type) + ) return numpy_output_type, ptx @@ -55,26 +65,17 @@ def wrapper(*args): return wrapper -from numba.types import int32, boolean, void, Tuple -import numpy as np -libcudf_bitmask_type = numpy_support.from_dtype(np.dtype('int32')) def masked_arrty_from_np_type(dtype): """ Return a type representing a tuple of arrays, the first element an array of the numba type - corresponding to `dtype`, and the second an + corresponding to `dtype`, and the second an array of bools represe """ nb_scalar_ty = numpy_support.from_dtype(dtype) - return Tuple( - ( - nb_scalar_ty[::1], - libcudf_bitmask_type[::1] - ) - ) + return Tuple((nb_scalar_ty[::1], libcudf_bitmask_type[::1])) -mask_bitsize = np.dtype('int32').itemsize * 8 @cuda.jit(device=True) def mask_get(mask, pos): @@ -82,29 +83,54 @@ def mask_get(mask, pos): def _define_function(df, scalar_return=False): - + start = "def _kernel(retval, " - + sig = ", ".join(["input_col_" + str(i) for i in range(len(df.columns))]) - start += (sig + "):\n") - + start += sig + "):\n" + start += "\ti = cuda.grid(1)\n" start += "\tret_data_arr, ret_mask_arr = retval\n" - + fargs = [] for i, col in enumerate(df._data.values()): ii = str(i) - start += "\td_"+ii+","+"m_"+ii+"=input_col_"+ii+"\n" - arg = "masked_"+ii + start += "\td_" + ii + "," + "m_" + ii + "=input_col_" + ii + "\n" + arg = "masked_" + ii if col.mask is not None: - start += "\t"+arg+"="+"Masked("+"d_"+ii+"[i]"+","+"mask_get(m_"+ii+","+"i)"+")\n" + start += ( + "\t" + + arg + + "=" + + "Masked(" + + "d_" + + ii + + "[i]" + + "," + + "mask_get(m_" + + ii + + "," + + "i)" + + ")\n" + ) else: - start += "\t"+arg+"="+"Masked("+"d_"+ii+"[i]"+","+"True"+")\n" + start += ( + "\t" + + arg + + "=" + + "Masked(" + + "d_" + + ii + + "[i]" + + "," + + "True" + + ")\n" + ) fargs.append(arg) fargs = "(" + ",".join(fargs) + ")\n" - start += "\tret = f_"+fargs+"\n" - + start += "\tret = f_" + fargs + "\n" + if scalar_return: start += "\tret_data_arr[i] = ret\n" start += "\tret_mask_arr[i] = True\n" @@ -114,24 +140,29 @@ def _define_function(df, scalar_return=False): return start + def udf_pipeline(df, f): retty = compile_masked_udf(f, df.dtypes)[0] _is_scalar_return = not isinstance(retty, MaskedType) - return_type = Tuple( - (numpy_support.from_dtype(retty)[::1], boolean[::1]) + return_type = Tuple((numpy_support.from_dtype(retty)[::1], boolean[::1])) + sig = void( + return_type, *[masked_arrty_from_np_type(dtype) for dtype in df.dtypes] ) - sig = void(return_type, *[masked_arrty_from_np_type(dtype) for dtype in df.dtypes]) - + f_ = cuda.jit(device=True)(f) # Set f_launch into the global namespace lcl = {} - exec(_define_function(df, scalar_return=_is_scalar_return), {'f_': f_, 'cuda': cuda, "Masked": Masked, "mask_get": mask_get}, lcl) - _kernel = lcl['_kernel'] + exec( + _define_function(df, scalar_return=_is_scalar_return), + {"f_": f_, "cuda": cuda, "Masked": Masked, "mask_get": mask_get}, + lcl, + ) + _kernel = lcl["_kernel"] # compile kernel = cuda.jit(sig)(_kernel) ans_col = cupy.empty(len(df), dtype=retty) - ans_mask = cudf.core.column.column_empty(len(df), dtype='bool') + ans_mask = cudf.core.column.column_empty(len(df), dtype="bool") launch_args = [(ans_col, ans_mask)] for col in df.columns: data = df[col]._column.data diff --git a/python/cudf/cudf/tests/test_udf_masked_ops.py b/python/cudf/cudf/tests/test_udf_masked_ops.py index b0e3a785bf5..621bb2b7fb4 100644 --- a/python/cudf/cudf/tests/test_udf_masked_ops.py +++ b/python/cudf/cudf/tests/test_udf_masked_ops.py @@ -2,7 +2,6 @@ import pandas as pd import pytest -from numba import cuda import cudf from cudf.core.udf.pipeline import nulludf @@ -15,7 +14,7 @@ operator.truediv, operator.floordiv, operator.mod, - operator.pow + operator.pow, ] comparison_ops = [ @@ -242,6 +241,7 @@ def func_gdf(x): gdf = cudf.DataFrame({"a": [1, 3, 6]}) run_masked_udf_test(func_pdf, func_gdf, gdf, check_dtype=False) + def test_apply_return_literal_only(): def func_pdf(x): return 5 @@ -250,7 +250,7 @@ def func_pdf(x): def func_gdf(x): return 5 - gdf = cudf.DataFrame({'a': [1, None, 3]}) + gdf = cudf.DataFrame({"a": [1, None, 3]}) run_masked_udf_test(func_pdf, func_gdf, gdf, check_dtype=False) From 0a971f0fe882ef2c979cd0eed74a5cfa5490ced1 Mon Sep 17 00:00:00 2001 From: brandon-b-miller Date: Fri, 3 Sep 2021 08:13:48 -0700 Subject: [PATCH 09/34] bugfix --- python/cudf/cudf/core/udf/pipeline.py | 31 ++++++++++++++++----------- 1 file changed, 18 insertions(+), 13 deletions(-) diff --git a/python/cudf/cudf/core/udf/pipeline.py b/python/cudf/cudf/core/udf/pipeline.py index 486985e7cf8..d14fa512192 100644 --- a/python/cudf/cudf/core/udf/pipeline.py +++ b/python/cudf/cudf/core/udf/pipeline.py @@ -29,14 +29,14 @@ def compile_masked_udf(func, dtypes): for arg in (numpy_support.from_dtype(np_type) for np_type in dtypes) ) # Get the inlineable PTX function - ptx, numba_output_type = cudautils.compile_udf(func, to_compiler_sig) - numpy_output_type = ( - numpy_support.as_dtype(numba_output_type.value_type) - if isinstance(numba_output_type, MaskedType) - else np.dtype(numba_output_type) - ) + ptx, output_type = cudautils.compile_udf(func, to_compiler_sig) + + if not isinstance(output_type, MaskedType): + numba_output_type = numpy_support.from_dtype(np.dtype(output_type)) + else: + numba_output_type = output_type - return numpy_output_type, ptx + return numba_output_type, ptx def nulludf(func): @@ -130,7 +130,6 @@ def _define_function(df, scalar_return=False): fargs = "(" + ",".join(fargs) + ")\n" start += "\tret = f_" + fargs + "\n" - if scalar_return: start += "\tret_data_arr[i] = ret\n" start += "\tret_mask_arr[i] = True\n" @@ -142,10 +141,14 @@ def _define_function(df, scalar_return=False): def udf_pipeline(df, f): - retty = compile_masked_udf(f, df.dtypes)[0] - _is_scalar_return = not isinstance(retty, MaskedType) - - return_type = Tuple((numpy_support.from_dtype(retty)[::1], boolean[::1])) + numba_return_type, _ = compile_masked_udf(f, df.dtypes) + _is_scalar_return = not isinstance(numba_return_type, MaskedType) + scalar_return_type = ( + numba_return_type + if _is_scalar_return + else numba_return_type.value_type + ) + return_type = Tuple((scalar_return_type[::1], boolean[::1])) sig = void( return_type, *[masked_arrty_from_np_type(dtype) for dtype in df.dtypes] ) @@ -161,7 +164,9 @@ def udf_pipeline(df, f): _kernel = lcl["_kernel"] # compile kernel = cuda.jit(sig)(_kernel) - ans_col = cupy.empty(len(df), dtype=retty) + ans_col = cupy.empty( + len(df), dtype=numpy_support.as_dtype(scalar_return_type) + ) ans_mask = cudf.core.column.column_empty(len(df), dtype="bool") launch_args = [(ans_col, ans_mask)] for col in df.columns: From 2f7e6f8cb67995d9c6509aed7b1834f2a1349835 Mon Sep 17 00:00:00 2001 From: brandon-b-miller Date: Fri, 3 Sep 2021 08:15:21 -0700 Subject: [PATCH 10/34] remove header --- cpp/src/transform/transform.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/cpp/src/transform/transform.cpp b/cpp/src/transform/transform.cpp index d62a5e7076c..2a763a5909c 100644 --- a/cpp/src/transform/transform.cpp +++ b/cpp/src/transform/transform.cpp @@ -24,7 +24,6 @@ #include #include -#include #include #include From 13a94cbe69b4d3ef57e49b07ce06860e2a3bd3ac Mon Sep 17 00:00:00 2001 From: brandon-b-miller Date: Fri, 3 Sep 2021 09:00:02 -0700 Subject: [PATCH 11/34] fix bool typing --- python/cudf/cudf/core/udf/lowering.py | 5 ++--- python/cudf/cudf/core/udf/typing.py | 4 ++-- python/cudf/cudf/tests/test_udf_masked_ops.py | 2 +- 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/python/cudf/cudf/core/udf/lowering.py b/python/cudf/cudf/core/udf/lowering.py index 1467a61f215..1638c3a6b77 100644 --- a/python/cudf/cudf/core/udf/lowering.py +++ b/python/cudf/cudf/core/udf/lowering.py @@ -154,9 +154,8 @@ def register_const_op(op): to_lower_op = make_const_op(op) cuda_lower(op, MaskedType, types.Number)(to_lower_op) cuda_lower(op, types.Number, MaskedType)(to_lower_op) - - # to_lower_op_reflected = make_reflected_const_op(op) - # cuda_lower(op, types.Number, MaskedType)(to_lower_op_reflected) + cuda_lower(op, MaskedType, types.Boolean)(to_lower_op) + cuda_lower(op, types.Boolean, MaskedType)(to_lower_op) # register all lowering at init diff --git a/python/cudf/cudf/core/udf/typing.py b/python/cudf/cudf/core/udf/typing.py index 6e026412f24..d053f4d605c 100644 --- a/python/cudf/cudf/core/udf/typing.py +++ b/python/cudf/cudf/core/udf/typing.py @@ -247,10 +247,10 @@ def generic(self, args, kws): # In the case of op(Masked, scalar), we resolve the type between # the Masked value_type and the scalar's type directly if isinstance(args[0], MaskedType) and isinstance( - args[1], types.Number + args[1], (types.Number, types.Boolean) ): to_resolve_types = (args[0].value_type, args[1]) - elif isinstance(args[0], types.Number) and isinstance( + elif isinstance(args[0], (types.Number, types.Boolean)) and isinstance( args[1], MaskedType ): to_resolve_types = (args[1].value_type, args[0]) diff --git a/python/cudf/cudf/tests/test_udf_masked_ops.py b/python/cudf/cudf/tests/test_udf_masked_ops.py index 621bb2b7fb4..c1211d663dc 100644 --- a/python/cudf/cudf/tests/test_udf_masked_ops.py +++ b/python/cudf/cudf/tests/test_udf_masked_ops.py @@ -78,7 +78,7 @@ def func_gdf(x, y): @pytest.mark.parametrize("op", arith_ops) -@pytest.mark.parametrize("constant", [1, 1.5]) +@pytest.mark.parametrize("constant", [1, 1.5, True, False]) def test_arith_masked_vs_constant(op, constant): def func_pdf(x): return op(x, constant) From b2a68e68464e9250e5fb1842e133fcf210c3b98c Mon Sep 17 00:00:00 2001 From: brandon-b-miller Date: Fri, 3 Sep 2021 11:26:20 -0700 Subject: [PATCH 12/34] template kernels --- python/cudf/cudf/core/udf/pipeline.py | 102 ++++++++++++++------------ 1 file changed, 55 insertions(+), 47 deletions(-) diff --git a/python/cudf/cudf/core/udf/pipeline.py b/python/cudf/cudf/core/udf/pipeline.py index d14fa512192..e9324cd6f07 100644 --- a/python/cudf/cudf/core/udf/pipeline.py +++ b/python/cudf/cudf/core/udf/pipeline.py @@ -82,62 +82,70 @@ def mask_get(mask, pos): return (mask[pos // mask_bitsize] >> (pos % mask_bitsize)) & 1 -def _define_function(df, scalar_return=False): +kernel_template = """\ +def _kernel(retval, {input_columns}): + i = cuda.grid(1) + ret_data_arr, ret_mask_arr = retval +{masked_input_initializers} + ret = {user_udf_call} + ret_data_arr[i] = {ret_value} + ret_mask_arr[i] = {ret_valid} +""" + +unmasked_input_initializer_template = """\ + d_{idx}, m_{idx} = input_col_{idx} + masked_{idx} = Masked(d_{idx}[i], True) +""" + +masked_input_initializer_template = """\ + d_{idx}, m_{idx} = input_col_{idx} + masked_{idx} = Masked(d_{idx}[i], mask_get(m_{idx}, i)) +""" - start = "def _kernel(retval, " - sig = ", ".join(["input_col_" + str(i) for i in range(len(df.columns))]) - start += sig + "):\n" +def _define_function(df, scalar_return=False): + # Create argument list for kernel + input_columns = ", ".join([f'input_col_{i}' + for i in range(len(df.columns))]) - start += "\ti = cuda.grid(1)\n" - start += "\tret_data_arr, ret_mask_arr = retval\n" + # Create argument list to pass to device function + args = ', '.join([f'masked_{i}' for i in range(len(df.columns))]) + user_udf_call = f'f_({args})' - fargs = [] + # Generate the initializers for each device function argument + initializers = [] for i, col in enumerate(df._data.values()): - ii = str(i) - start += "\td_" + ii + "," + "m_" + ii + "=input_col_" + ii + "\n" - arg = "masked_" + ii + idx = str(i) if col.mask is not None: - start += ( - "\t" - + arg - + "=" - + "Masked(" - + "d_" - + ii - + "[i]" - + "," - + "mask_get(m_" - + ii - + "," - + "i)" - + ")\n" - ) + template = masked_input_initializer_template else: - start += ( - "\t" - + arg - + "=" - + "Masked(" - + "d_" - + ii - + "[i]" - + "," - + "True" - + ")\n" - ) - fargs.append(arg) - - fargs = "(" + ",".join(fargs) + ")\n" - start += "\tret = f_" + fargs + "\n" + template = unmasked_input_initializer_template + + initializer = template.format(**{'idx': idx}) + + initializers.append(initializer) + + masked_input_initializers = "\n".join(initializers) + + # Generate the code to extract the return value and mask from the device + # function's return value depending on whether it's already masked if scalar_return: - start += "\tret_data_arr[i] = ret\n" - start += "\tret_mask_arr[i] = True\n" + ret_value = 'ret' + ret_valid = 'True' else: - start += "\tret_data_arr[i] = ret.value\n" - start += "\tret_mask_arr[i] = ret.valid\n" - - return start + ret_value = 'ret.value' + ret_valid = 'ret.valid' + + # Incorporate all of the above into the kernel code template + d = { + 'input_columns': input_columns, + 'masked_input_initializers': masked_input_initializers, + 'user_udf_call': user_udf_call, + 'ret_value': ret_value, + 'ret_valid': ret_valid, + } + + return kernel_template.format(**d) def udf_pipeline(df, f): From 5cb75e7d702c2807d92db96c0bee014716c989b7 Mon Sep 17 00:00:00 2001 From: brandon-b-miller Date: Fri, 3 Sep 2021 11:38:57 -0700 Subject: [PATCH 13/34] switch back to forall --- python/cudf/cudf/core/udf/pipeline.py | 54 ++++++++++++++------------- 1 file changed, 29 insertions(+), 25 deletions(-) diff --git a/python/cudf/cudf/core/udf/pipeline.py b/python/cudf/cudf/core/udf/pipeline.py index e9324cd6f07..f31ef85e56b 100644 --- a/python/cudf/cudf/core/udf/pipeline.py +++ b/python/cudf/cudf/core/udf/pipeline.py @@ -2,7 +2,7 @@ import numpy as np from numba import cuda from numba.np import numpy_support -from numba.types import Tuple, boolean, void +from numba.types import Tuple, boolean, int64, void from nvtx import annotate import cudf @@ -83,34 +83,36 @@ def mask_get(mask, pos): kernel_template = """\ -def _kernel(retval, {input_columns}): +def _kernel(retval, {input_columns}, size): i = cuda.grid(1) ret_data_arr, ret_mask_arr = retval + if i < size: {masked_input_initializers} - ret = {user_udf_call} - ret_data_arr[i] = {ret_value} - ret_mask_arr[i] = {ret_valid} + ret = {user_udf_call} + ret_data_arr[i] = {ret_value} + ret_mask_arr[i] = {ret_valid} """ unmasked_input_initializer_template = """\ - d_{idx}, m_{idx} = input_col_{idx} - masked_{idx} = Masked(d_{idx}[i], True) + d_{idx}, m_{idx} = input_col_{idx} + masked_{idx} = Masked(d_{idx}[i], True) """ masked_input_initializer_template = """\ - d_{idx}, m_{idx} = input_col_{idx} - masked_{idx} = Masked(d_{idx}[i], mask_get(m_{idx}, i)) + d_{idx}, m_{idx} = input_col_{idx} + masked_{idx} = Masked(d_{idx}[i], mask_get(m_{idx}, i)) """ def _define_function(df, scalar_return=False): # Create argument list for kernel - input_columns = ", ".join([f'input_col_{i}' - for i in range(len(df.columns))]) + input_columns = ", ".join( + [f"input_col_{i}" for i in range(len(df.columns))] + ) # Create argument list to pass to device function - args = ', '.join([f'masked_{i}' for i in range(len(df.columns))]) - user_udf_call = f'f_({args})' + args = ", ".join([f"masked_{i}" for i in range(len(df.columns))]) + user_udf_call = f"f_({args})" # Generate the initializers for each device function argument initializers = [] @@ -121,7 +123,7 @@ def _define_function(df, scalar_return=False): else: template = unmasked_input_initializer_template - initializer = template.format(**{'idx': idx}) + initializer = template.format(**{"idx": idx}) initializers.append(initializer) @@ -130,19 +132,19 @@ def _define_function(df, scalar_return=False): # Generate the code to extract the return value and mask from the device # function's return value depending on whether it's already masked if scalar_return: - ret_value = 'ret' - ret_valid = 'True' + ret_value = "ret" + ret_valid = "True" else: - ret_value = 'ret.value' - ret_valid = 'ret.valid' + ret_value = "ret.value" + ret_valid = "ret.valid" # Incorporate all of the above into the kernel code template d = { - 'input_columns': input_columns, - 'masked_input_initializers': masked_input_initializers, - 'user_udf_call': user_udf_call, - 'ret_value': ret_value, - 'ret_valid': ret_valid, + "input_columns": input_columns, + "masked_input_initializers": masked_input_initializers, + "user_udf_call": user_udf_call, + "ret_value": ret_value, + "ret_valid": ret_valid, } return kernel_template.format(**d) @@ -158,7 +160,8 @@ def udf_pipeline(df, f): ) return_type = Tuple((scalar_return_type[::1], boolean[::1])) sig = void( - return_type, *[masked_arrty_from_np_type(dtype) for dtype in df.dtypes] + return_type, + *[masked_arrty_from_np_type(dtype) for dtype in df.dtypes] + [int64], ) f_ = cuda.jit(device=True)(f) @@ -184,6 +187,7 @@ def udf_pipeline(df, f): mask = cudf.core.buffer.Buffer() launch_args.append((data, mask)) - kernel[1, len(df)](*launch_args) + launch_args.append(len(df)) # size + kernel.forall(len(df))(*launch_args) result = cudf.Series(ans_col).set_mask(bools_to_mask(ans_mask)) return result From 49d99781b01b1555b5c1c14a6e09b025d29c8c53 Mon Sep 17 00:00:00 2001 From: brandon-b-miller Date: Fri, 3 Sep 2021 12:06:11 -0700 Subject: [PATCH 14/34] implement construct_signature --- python/cudf/cudf/core/udf/pipeline.py | 32 ++++++++++++++++++++++----- 1 file changed, 27 insertions(+), 5 deletions(-) diff --git a/python/cudf/cudf/core/udf/pipeline.py b/python/cudf/cudf/core/udf/pipeline.py index f31ef85e56b..186fa6162d7 100644 --- a/python/cudf/cudf/core/udf/pipeline.py +++ b/python/cudf/cudf/core/udf/pipeline.py @@ -76,6 +76,31 @@ def masked_arrty_from_np_type(dtype): nb_scalar_ty = numpy_support.from_dtype(dtype) return Tuple((nb_scalar_ty[::1], libcudf_bitmask_type[::1])) +def construct_signature(df, return_type): + """ + Build the signature of numba types that will be used to + actually JIT the kernel itself later, accounting for types + and offsets + """ + + # Tuple of arrays, first the output data array, then the mask + return_type = Tuple((return_type[::1], boolean[::1])) + offsets = [] + sig = [return_type] + for col in df._data.values(): + sig.append(masked_arrty_from_np_type(col.dtype)) + offsets.append(col.offset) + + # return_type + data,masks + offsets + size + #sig = void( + # *(sig + offsets + [int64]) + #) + sig = void( + *(sig + [int64]) + ) + + return sig + @cuda.jit(device=True) def mask_get(mask, pos): @@ -158,11 +183,8 @@ def udf_pipeline(df, f): if _is_scalar_return else numba_return_type.value_type ) - return_type = Tuple((scalar_return_type[::1], boolean[::1])) - sig = void( - return_type, - *[masked_arrty_from_np_type(dtype) for dtype in df.dtypes] + [int64], - ) + + sig = construct_signature(df, scalar_return_type) f_ = cuda.jit(device=True)(f) # Set f_launch into the global namespace From 2ba8bd2a2047d8ff85c554142f5612316062f8a8 Mon Sep 17 00:00:00 2001 From: brandon-b-miller Date: Fri, 3 Sep 2021 12:29:12 -0700 Subject: [PATCH 15/34] support offsets --- python/cudf/cudf/core/udf/pipeline.py | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/python/cudf/cudf/core/udf/pipeline.py b/python/cudf/cudf/core/udf/pipeline.py index 186fa6162d7..6f55b307f61 100644 --- a/python/cudf/cudf/core/udf/pipeline.py +++ b/python/cudf/cudf/core/udf/pipeline.py @@ -76,6 +76,7 @@ def masked_arrty_from_np_type(dtype): nb_scalar_ty = numpy_support.from_dtype(dtype) return Tuple((nb_scalar_ty[::1], libcudf_bitmask_type[::1])) + def construct_signature(df, return_type): """ Build the signature of numba types that will be used to @@ -89,15 +90,10 @@ def construct_signature(df, return_type): sig = [return_type] for col in df._data.values(): sig.append(masked_arrty_from_np_type(col.dtype)) - offsets.append(col.offset) + offsets.append(int64) # return_type + data,masks + offsets + size - #sig = void( - # *(sig + offsets + [int64]) - #) - sig = void( - *(sig + [int64]) - ) + sig = void(*(sig + offsets + [int64])) return sig @@ -108,7 +104,7 @@ def mask_get(mask, pos): kernel_template = """\ -def _kernel(retval, {input_columns}, size): +def _kernel(retval, {input_columns}, {input_offsets}, size): i = cuda.grid(1) ret_data_arr, ret_mask_arr = retval if i < size: @@ -125,7 +121,7 @@ def _kernel(retval, {input_columns}, size): masked_input_initializer_template = """\ d_{idx}, m_{idx} = input_col_{idx} - masked_{idx} = Masked(d_{idx}[i], mask_get(m_{idx}, i)) + masked_{idx} = Masked(d_{idx}[i], mask_get(m_{idx}, i + offset_{idx})) """ @@ -135,6 +131,8 @@ def _define_function(df, scalar_return=False): [f"input_col_{i}" for i in range(len(df.columns))] ) + input_offsets = ", ".join([f"offset_{i}" for i in range(len(df.columns))]) + # Create argument list to pass to device function args = ", ".join([f"masked_{i}" for i in range(len(df.columns))]) user_udf_call = f"f_({args})" @@ -166,6 +164,7 @@ def _define_function(df, scalar_return=False): # Incorporate all of the above into the kernel code template d = { "input_columns": input_columns, + "input_offsets": input_offsets, "masked_input_initializers": masked_input_initializers, "user_udf_call": user_udf_call, "ret_value": ret_value, @@ -202,13 +201,17 @@ def udf_pipeline(df, f): ) ans_mask = cudf.core.column.column_empty(len(df), dtype="bool") launch_args = [(ans_col, ans_mask)] + offsets = [] for col in df.columns: data = df[col]._column.data mask = df[col]._column.mask if mask is None: mask = cudf.core.buffer.Buffer() launch_args.append((data, mask)) + offsets.append(df[col]._column.offset) + launch_args += offsets + breakpoint() launch_args.append(len(df)) # size kernel.forall(len(df))(*launch_args) result = cudf.Series(ans_col).set_mask(bools_to_mask(ans_mask)) From 11b2fd10bc79d56b8e44212cd54bb7322031fab5 Mon Sep 17 00:00:00 2001 From: brandon-b-miller Date: Fri, 3 Sep 2021 13:27:40 -0700 Subject: [PATCH 16/34] cache kernels --- python/cudf/cudf/core/udf/pipeline.py | 20 +++++++++++++++----- python/cudf/cudf/utils/cudautils.py | 10 ++++++++++ 2 files changed, 25 insertions(+), 5 deletions(-) diff --git a/python/cudf/cudf/core/udf/pipeline.py b/python/cudf/cudf/core/udf/pipeline.py index 6f55b307f61..559cea02b8e 100644 --- a/python/cudf/cudf/core/udf/pipeline.py +++ b/python/cudf/cudf/core/udf/pipeline.py @@ -6,6 +6,7 @@ from nvtx import annotate import cudf +import cachetools from cudf._lib.transform import bools_to_mask from cudf.core.udf.classes import Masked from cudf.core.udf.typing import MaskedType @@ -13,6 +14,7 @@ libcudf_bitmask_type = numpy_support.from_dtype(np.dtype("int32")) mask_bitsize = np.dtype("int32").itemsize * 8 +_kernel_cache: cachetools.LRUCache = cachetools.LRUCache(maxsize=32) @annotate("NUMBA JIT", color="green", domain="cudf_python") @@ -124,7 +126,7 @@ def _kernel(retval, {input_columns}, {input_offsets}, size): masked_{idx} = Masked(d_{idx}[i], mask_get(m_{idx}, i + offset_{idx})) """ - +@annotate("DEFINE", color="yellow", domain="cudf_python") def _define_function(df, scalar_return=False): # Create argument list for kernel input_columns = ", ".join( @@ -174,8 +176,9 @@ def _define_function(df, scalar_return=False): return kernel_template.format(**d) +@annotate("UDF PIPELINE", color="black", domain="cudf_python") def udf_pipeline(df, f): - numba_return_type, _ = compile_masked_udf(f, df.dtypes) + numba_return_type, ptx = compile_masked_udf(f, df.dtypes) _is_scalar_return = not isinstance(numba_return_type, MaskedType) scalar_return_type = ( numba_return_type @@ -194,8 +197,16 @@ def udf_pipeline(df, f): lcl, ) _kernel = lcl["_kernel"] - # compile - kernel = cuda.jit(sig)(_kernel) + + # check to see if we already compiled this function + kernel_cache_key = cudautils._make_cache_key(_kernel, sig) + kernel_cache_key = (*kernel_cache_key, ptx) + if _kernel_cache.get(kernel_cache_key) is None: + kernel = cuda.jit(sig)(_kernel) + _kernel_cache[kernel_cache_key] = kernel + else: + kernel = _kernel_cache[kernel_cache_key] + ans_col = cupy.empty( len(df), dtype=numpy_support.as_dtype(scalar_return_type) ) @@ -211,7 +222,6 @@ def udf_pipeline(df, f): offsets.append(df[col]._column.offset) launch_args += offsets - breakpoint() launch_args.append(len(df)) # size kernel.forall(len(df))(*launch_args) result = cudf.Series(ans_col).set_mask(bools_to_mask(ans_mask)) diff --git a/python/cudf/cudf/utils/cudautils.py b/python/cudf/cudf/utils/cudautils.py index 727bbb1c345..ff826d5071a 100755 --- a/python/cudf/cudf/utils/cudautils.py +++ b/python/cudf/cudf/utils/cudautils.py @@ -209,6 +209,16 @@ def grouped_window_sizes_from_offset(arr, group_starts, offset): # closure variables to check for a hit. _udf_code_cache: cachetools.LRUCache = cachetools.LRUCache(maxsize=32) +def _make_cache_key(func, sig): + codebytes = func.__code__.co_code + if func.__closure__ is not None: + cvars = tuple([x.cell_contents for x in func.__closure__]) + cvarbytes = dumps(cvars) + else: + cvarbytes = b"" + + key = (sig, codebytes, cvarbytes) + return key def compile_udf(udf, type_signature): """Compile ``udf`` with `numba` From 775dd570e50f8013305bfaa3e50d28f45369cc2e Mon Sep 17 00:00:00 2001 From: brandon-b-miller Date: Tue, 7 Sep 2021 13:50:58 -0700 Subject: [PATCH 17/34] style --- python/cudf/cudf/core/udf/pipeline.py | 3 ++- python/cudf/cudf/utils/cudautils.py | 2 ++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/python/cudf/cudf/core/udf/pipeline.py b/python/cudf/cudf/core/udf/pipeline.py index 559cea02b8e..aee62abf134 100644 --- a/python/cudf/cudf/core/udf/pipeline.py +++ b/python/cudf/cudf/core/udf/pipeline.py @@ -1,3 +1,4 @@ +import cachetools import cupy import numpy as np from numba import cuda @@ -6,7 +7,6 @@ from nvtx import annotate import cudf -import cachetools from cudf._lib.transform import bools_to_mask from cudf.core.udf.classes import Masked from cudf.core.udf.typing import MaskedType @@ -126,6 +126,7 @@ def _kernel(retval, {input_columns}, {input_offsets}, size): masked_{idx} = Masked(d_{idx}[i], mask_get(m_{idx}, i + offset_{idx})) """ + @annotate("DEFINE", color="yellow", domain="cudf_python") def _define_function(df, scalar_return=False): # Create argument list for kernel diff --git a/python/cudf/cudf/utils/cudautils.py b/python/cudf/cudf/utils/cudautils.py index ff826d5071a..4939b00b7ee 100755 --- a/python/cudf/cudf/utils/cudautils.py +++ b/python/cudf/cudf/utils/cudautils.py @@ -209,6 +209,7 @@ def grouped_window_sizes_from_offset(arr, group_starts, offset): # closure variables to check for a hit. _udf_code_cache: cachetools.LRUCache = cachetools.LRUCache(maxsize=32) + def _make_cache_key(func, sig): codebytes = func.__code__.co_code if func.__closure__ is not None: @@ -220,6 +221,7 @@ def _make_cache_key(func, sig): key = (sig, codebytes, cvarbytes) return key + def compile_udf(udf, type_signature): """Compile ``udf`` with `numba` From 04c38e688a144f312d6b931017b53300a11261a2 Mon Sep 17 00:00:00 2001 From: brandon-b-miller Date: Tue, 7 Sep 2021 19:23:38 -0700 Subject: [PATCH 18/34] skip cases where pandas null logic differs --- python/cudf/cudf/tests/test_udf_masked_ops.py | 25 ++++++++++++++++++- 1 file changed, 24 insertions(+), 1 deletion(-) diff --git a/python/cudf/cudf/tests/test_udf_masked_ops.py b/python/cudf/cudf/tests/test_udf_masked_ops.py index c1211d663dc..9b98f4d33a2 100644 --- a/python/cudf/cudf/tests/test_udf_masked_ops.py +++ b/python/cudf/cudf/tests/test_udf_masked_ops.py @@ -76,6 +76,7 @@ def func_gdf(x, y): ) run_masked_udf_test(func_pdf, func_gdf, gdf, check_dtype=False) +test_arith_masked_vs_constant_skip_cases = {(False, operator.truediv), (False, operator.floordiv), (False, operator.mod), (False, operator.pow)} @pytest.mark.parametrize("op", arith_ops) @pytest.mark.parametrize("constant", [1, 1.5, True, False]) @@ -90,11 +91,20 @@ def func_gdf(x): # Just a single column -> result will be all NA gdf = cudf.DataFrame({"data": [1, 2, None]}) + if (constant, op) in test_arith_masked_vs_constant_skip_cases: + # The following tests cases yield undefined behavior: + # - truediv(x, False) because its dividing by zero + # - floordiv(x, False) because its dividing by zero + # - mod(x, False) because its mod by zero, + # - pow(x, False) because we have an NA in the series and pandas + # insists that (NA**0 == 1) where we do not + pytest.skip() run_masked_udf_test(func_pdf, func_gdf, gdf, check_dtype=False) +test_arith_masked_vs_constant_reflected_skip_cases = {(True, operator.pow), (1, operator.pow)} @pytest.mark.parametrize("op", arith_ops) -@pytest.mark.parametrize("constant", [1, 1.5]) +@pytest.mark.parametrize("constant", [1, 1.5, True, False]) def test_arith_masked_vs_constant_reflected(op, constant): def func_pdf(x): return op(constant, x) @@ -106,8 +116,16 @@ def func_gdf(x): # Just a single column -> result will be all NA gdf = cudf.DataFrame({"data": [1, 2, None]}) + if (constant, op) in test_arith_masked_vs_constant_reflected_skip_cases: + # The following tests cases yield differing results from pandas: + # - 1**NA + # - True**NA + # both due to pandas insisting that this is equal to 1. + pytest.skip() + run_masked_udf_test(func_pdf, func_gdf, gdf, check_dtype=False) +test_arith_masked_vs_null_skip_cases = {operator.pow} @pytest.mark.parametrize("op", arith_ops) def test_arith_masked_vs_null(op): @@ -118,6 +136,11 @@ def func_pdf(x): def func_gdf(x): return op(x, cudf.NA) + if op in test_arith_masked_vs_null_skip_cases: + # Pow will fail, because pandas says 1**NA == 1 + # so our result will be different + pytest.skip() + gdf = cudf.DataFrame({"data": [1, None, 3]}) run_masked_udf_test(func_pdf, func_gdf, gdf, check_dtype=False) From 7a01bdbd7da11d562826f4f9fd3c68ddd21460bd Mon Sep 17 00:00:00 2001 From: brandon-b-miller Date: Tue, 7 Sep 2021 19:25:48 -0700 Subject: [PATCH 19/34] style --- cpp/src/transform/transform.cpp | 2 -- python/cudf/cudf/tests/test_udf_masked_ops.py | 20 ++++++++++++++++--- 2 files changed, 17 insertions(+), 5 deletions(-) diff --git a/cpp/src/transform/transform.cpp b/cpp/src/transform/transform.cpp index 2a763a5909c..78bd74d1301 100644 --- a/cpp/src/transform/transform.cpp +++ b/cpp/src/transform/transform.cpp @@ -64,7 +64,6 @@ void unary_operation(mutable_column_view output, cudf::jit::get_data_ptr(input)); } - } // namespace jit } // namespace transformation @@ -103,5 +102,4 @@ std::unique_ptr transform(column_view const& input, return detail::transform(input, unary_udf, output_type, is_ptx, rmm::cuda_stream_default, mr); } - } // namespace cudf diff --git a/python/cudf/cudf/tests/test_udf_masked_ops.py b/python/cudf/cudf/tests/test_udf_masked_ops.py index 9b98f4d33a2..cf38fc20c87 100644 --- a/python/cudf/cudf/tests/test_udf_masked_ops.py +++ b/python/cudf/cudf/tests/test_udf_masked_ops.py @@ -76,7 +76,14 @@ def func_gdf(x, y): ) run_masked_udf_test(func_pdf, func_gdf, gdf, check_dtype=False) -test_arith_masked_vs_constant_skip_cases = {(False, operator.truediv), (False, operator.floordiv), (False, operator.mod), (False, operator.pow)} + +test_arith_masked_vs_constant_skip_cases = { + (False, operator.truediv), + (False, operator.floordiv), + (False, operator.mod), + (False, operator.pow), +} + @pytest.mark.parametrize("op", arith_ops) @pytest.mark.parametrize("constant", [1, 1.5, True, False]) @@ -94,14 +101,19 @@ def func_gdf(x): if (constant, op) in test_arith_masked_vs_constant_skip_cases: # The following tests cases yield undefined behavior: # - truediv(x, False) because its dividing by zero - # - floordiv(x, False) because its dividing by zero + # - floordiv(x, False) because its dividing by zero # - mod(x, False) because its mod by zero, # - pow(x, False) because we have an NA in the series and pandas # insists that (NA**0 == 1) where we do not pytest.skip() run_masked_udf_test(func_pdf, func_gdf, gdf, check_dtype=False) -test_arith_masked_vs_constant_reflected_skip_cases = {(True, operator.pow), (1, operator.pow)} + +test_arith_masked_vs_constant_reflected_skip_cases = { + (True, operator.pow), + (1, operator.pow), +} + @pytest.mark.parametrize("op", arith_ops) @pytest.mark.parametrize("constant", [1, 1.5, True, False]) @@ -125,8 +137,10 @@ def func_gdf(x): run_masked_udf_test(func_pdf, func_gdf, gdf, check_dtype=False) + test_arith_masked_vs_null_skip_cases = {operator.pow} + @pytest.mark.parametrize("op", arith_ops) def test_arith_masked_vs_null(op): def func_pdf(x): From 627d19707c7fcd4381a3d2d9a290142e63db5576 Mon Sep 17 00:00:00 2001 From: brandon-b-miller Date: Wed, 8 Sep 2021 06:46:47 -0700 Subject: [PATCH 20/34] update tests slightly --- python/cudf/cudf/tests/test_udf_masked_ops.py | 76 ++++++++++--------- 1 file changed, 39 insertions(+), 37 deletions(-) diff --git a/python/cudf/cudf/tests/test_udf_masked_ops.py b/python/cudf/cudf/tests/test_udf_masked_ops.py index cf38fc20c87..1410a24ac7a 100644 --- a/python/cudf/cudf/tests/test_udf_masked_ops.py +++ b/python/cudf/cudf/tests/test_udf_masked_ops.py @@ -87,7 +87,8 @@ def func_gdf(x, y): @pytest.mark.parametrize("op", arith_ops) @pytest.mark.parametrize("constant", [1, 1.5, True, False]) -def test_arith_masked_vs_constant(op, constant): +@pytest.mark.parametrize("data", [[1, 2, cudf.NA]]) +def test_arith_masked_vs_constant(op, constant, data): def func_pdf(x): return op(x, constant) @@ -95,29 +96,30 @@ def func_pdf(x): def func_gdf(x): return op(x, constant) - # Just a single column -> result will be all NA - gdf = cudf.DataFrame({"data": [1, 2, None]}) - - if (constant, op) in test_arith_masked_vs_constant_skip_cases: - # The following tests cases yield undefined behavior: - # - truediv(x, False) because its dividing by zero - # - floordiv(x, False) because its dividing by zero - # - mod(x, False) because its mod by zero, - # - pow(x, False) because we have an NA in the series and pandas - # insists that (NA**0 == 1) where we do not - pytest.skip() + gdf = cudf.DataFrame({"data": data}) + + if constant is False and op in { + operator.mod, + operator.pow, + operator.truediv, + operator.floordiv, + }: + with pytest.xfail(): + # The following tests cases yield undefined behavior: + # - truediv(x, False) because its dividing by zero + # - floordiv(x, False) because its dividing by zero + # - mod(x, False) because its mod by zero, + # - pow(x, False) because we have an NA in the series and pandas + # insists that (NA**0 == 1) where we do not + run_masked_udf_test(func_pdf, func_gdf, gdf, check_dtype=False) + return run_masked_udf_test(func_pdf, func_gdf, gdf, check_dtype=False) -test_arith_masked_vs_constant_reflected_skip_cases = { - (True, operator.pow), - (1, operator.pow), -} - - @pytest.mark.parametrize("op", arith_ops) @pytest.mark.parametrize("constant", [1, 1.5, True, False]) -def test_arith_masked_vs_constant_reflected(op, constant): +@pytest.mark.parametrize("data", [[2, 3, cudf.NA], [1, cudf.NA, 1]]) +def test_arith_masked_vs_constant_reflected(op, constant, data): def func_pdf(x): return op(constant, x) @@ -126,23 +128,22 @@ def func_gdf(x): return op(constant, x) # Just a single column -> result will be all NA - gdf = cudf.DataFrame({"data": [1, 2, None]}) - - if (constant, op) in test_arith_masked_vs_constant_reflected_skip_cases: - # The following tests cases yield differing results from pandas: - # - 1**NA - # - True**NA - # both due to pandas insisting that this is equal to 1. - pytest.skip() - + gdf = cudf.DataFrame({"data": data}) + + if constant == 1 and op is operator.pow: + with pytest.xfail(): + # The following tests cases yield differing results from pandas: + # - 1**NA + # - True**NA + # both due to pandas insisting that this is equal to 1. + run_masked_udf_test(func_pdf, func_gdf, gdf, check_dtype=False) + return run_masked_udf_test(func_pdf, func_gdf, gdf, check_dtype=False) -test_arith_masked_vs_null_skip_cases = {operator.pow} - - @pytest.mark.parametrize("op", arith_ops) -def test_arith_masked_vs_null(op): +@pytest.mark.parametrize("data", [[1, cudf.NA, 3], [2, 3, cudf.NA]]) +def test_arith_masked_vs_null(op, data): def func_pdf(x): return op(x, pd.NA) @@ -150,12 +151,13 @@ def func_pdf(x): def func_gdf(x): return op(x, cudf.NA) - if op in test_arith_masked_vs_null_skip_cases: - # Pow will fail, because pandas says 1**NA == 1 - # so our result will be different - pytest.skip() + gdf = cudf.DataFrame({"data": data}) - gdf = cudf.DataFrame({"data": [1, None, 3]}) + if 1 in gdf["data"] and op is operator.pow: + with pytest.xfail(): + # In pandas, 1**NA == 1. + run_masked_udf_test(func_pdf, func_gdf, gdf, check_dtype=False) + return run_masked_udf_test(func_pdf, func_gdf, gdf, check_dtype=False) From d3e2e0b5e9bf93357c5a42834d6bd828126b9101 Mon Sep 17 00:00:00 2001 From: brandon-b-miller Date: Wed, 8 Sep 2021 06:57:03 -0700 Subject: [PATCH 21/34] updates to pipeline.py --- python/cudf/cudf/core/udf/pipeline.py | 28 +++++++++++++++------------ 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/python/cudf/cudf/core/udf/pipeline.py b/python/cudf/cudf/core/udf/pipeline.py index aee62abf134..005e4e9d851 100644 --- a/python/cudf/cudf/core/udf/pipeline.py +++ b/python/cudf/cudf/core/udf/pipeline.py @@ -14,7 +14,7 @@ libcudf_bitmask_type = numpy_support.from_dtype(np.dtype("int32")) mask_bitsize = np.dtype("int32").itemsize * 8 -_kernel_cache: cachetools.LRUCache = cachetools.LRUCache(maxsize=32) +precompiled: cachetools.LRUCache = cachetools.LRUCache(maxsize=32) @annotate("NUMBA JIT", color="green", domain="cudf_python") @@ -189,41 +189,45 @@ def udf_pipeline(df, f): sig = construct_signature(df, scalar_return_type) + # Create a device version of the f_ = cuda.jit(device=True)(f) - # Set f_launch into the global namespace + lcl = {} exec( + # Defines a kernel named "_kernel" in the lcl dict _define_function(df, scalar_return=_is_scalar_return), {"f_": f_, "cuda": cuda, "Masked": Masked, "mask_get": mask_get}, lcl, ) + # The python function definition representing the kernel _kernel = lcl["_kernel"] # check to see if we already compiled this function - kernel_cache_key = cudautils._make_cache_key(_kernel, sig) - kernel_cache_key = (*kernel_cache_key, ptx) - if _kernel_cache.get(kernel_cache_key) is None: + cache_key = cudautils._make_cache_key(_kernel, sig) + cache_key = (*cache_key, ptx) + if precompiled.get(cache_key) is None: kernel = cuda.jit(sig)(_kernel) - _kernel_cache[kernel_cache_key] = kernel + precompiled[cache_key] = kernel else: - kernel = _kernel_cache[kernel_cache_key] + kernel = precompiled[cache_key] + # Mask and data column preallocated ans_col = cupy.empty( len(df), dtype=numpy_support.as_dtype(scalar_return_type) ) ans_mask = cudf.core.column.column_empty(len(df), dtype="bool") launch_args = [(ans_col, ans_mask)] offsets = [] - for col in df.columns: - data = df[col]._column.data - mask = df[col]._column.mask + for col in df._data.values(): + data = col.data + mask = col.mask if mask is None: mask = cudf.core.buffer.Buffer() launch_args.append((data, mask)) - offsets.append(df[col]._column.offset) - + offsets.append(col.offset) launch_args += offsets launch_args.append(len(df)) # size kernel.forall(len(df))(*launch_args) + result = cudf.Series(ans_col).set_mask(bools_to_mask(ans_mask)) return result From 306f5e1d29c108d8de0dc6049bafe87d7198d136 Mon Sep 17 00:00:00 2001 From: brandon-b-miller Date: Mon, 13 Sep 2021 07:47:51 -0700 Subject: [PATCH 22/34] address many reviews --- python/cudf/cudf/core/frame.py | 24 ++++- python/cudf/cudf/core/udf/lowering.py | 25 +++++- python/cudf/cudf/core/udf/pipeline.py | 123 +++++++++++--------------- python/cudf/cudf/core/udf/typing.py | 25 ++++++ python/cudf/cudf/utils/cudautils.py | 22 ++--- 5 files changed, 133 insertions(+), 86 deletions(-) diff --git a/python/cudf/cudf/core/frame.py b/python/cudf/cudf/core/frame.py index cfbf66029a8..ea5e2bf3054 100644 --- a/python/cudf/cudf/core/frame.py +++ b/python/cudf/cudf/core/frame.py @@ -34,6 +34,7 @@ ) from cudf.core.column_accessor import ColumnAccessor from cudf.core.join import merge +from cudf.core.udf.pipeline import compile_or_get from cudf.core.window import Rolling from cudf.utils import ioutils from cudf.utils.docutils import copy_docstring @@ -1341,7 +1342,28 @@ def _apply(self, func): """ Apply `func` across the rows of the frame. """ - result = cudf.core.udf.pipeline.udf_pipeline(self, func) + kernel, retty = compile_or_get(self, func) + + # Mask and data column preallocated + ans_col = cupy.empty(len(self), dtype=retty) + ans_mask = cudf.core.column.column_empty(len(self), dtype="bool") + launch_args = [(ans_col, ans_mask)] + offsets = [] + for col in self._data.values(): + data = col.data + mask = col.mask + if mask is None: + mask = cudf.core.buffer.Buffer() + launch_args.append((data, mask)) + offsets.append(col.offset) + launch_args += offsets + launch_args.append(len(self)) # size + kernel.forall(len(self))(*launch_args) + + result = cudf.Series(ans_col).set_mask( + libcudf.transform.bools_to_mask(ans_mask) + ) + return result def rank( diff --git a/python/cudf/cudf/core/udf/lowering.py b/python/cudf/cudf/core/udf/lowering.py index 1638c3a6b77..39e342befb8 100644 --- a/python/cudf/cudf/core/udf/lowering.py +++ b/python/cudf/cudf/core/udf/lowering.py @@ -9,7 +9,7 @@ ) from numba.extending import lower_builtin, types -from cudf.core.udf.typing import MaskedType, NAType +from cudf.core.udf.typing import MaskedType, NAType, pack_return from . import classes from ._ops import arith_ops, comparison_ops @@ -193,6 +193,29 @@ def masked_scalar_is_null_impl(context, builder, sig, args): return builder.load(result) +# Main kernel always calls `pack_return` on whatever the user defined +# function returned. This returns the same data if its already a `Masked` +# else packs it up into a new one that is valid from the get go +@cuda_lower(pack_return, MaskedType) +def pack_return_masked_impl(context, builder, sig, args): + outdata = cgutils.create_struct_proxy(sig.return_type)( + context, builder, value=args[0] + ) + return outdata._getvalue() + + +def pack_return_scalar_impl(context, builder, sig, args): + outdata = cgutils.create_struct_proxy(sig.return_type)(context, builder) + outdata.value = args[0] + outdata.valid = context.get_constant(types.boolean, 1) + + return outdata._getvalue() + + +cuda_lower(pack_return, types.Number)(pack_return_scalar_impl) +cuda_lower(pack_return, types.Boolean)(pack_return_scalar_impl) + + @cuda_lower(operator.truth, MaskedType) def masked_scalar_truth_impl(context, builder, sig, args): indata = cgutils.create_struct_proxy(MaskedType(types.boolean))( diff --git a/python/cudf/cudf/core/udf/pipeline.py b/python/cudf/cudf/core/udf/pipeline.py index 005e4e9d851..12afd07a494 100644 --- a/python/cudf/cudf/core/udf/pipeline.py +++ b/python/cudf/cudf/core/udf/pipeline.py @@ -1,30 +1,30 @@ import cachetools -import cupy import numpy as np from numba import cuda from numba.np import numpy_support from numba.types import Tuple, boolean, int64, void from nvtx import annotate -import cudf -from cudf._lib.transform import bools_to_mask from cudf.core.udf.classes import Masked -from cudf.core.udf.typing import MaskedType +from cudf.core.udf.typing import MaskedType, pack_return from cudf.utils import cudautils libcudf_bitmask_type = numpy_support.from_dtype(np.dtype("int32")) -mask_bitsize = np.dtype("int32").itemsize * 8 +MASK_BITSIZE = np.dtype("int32").itemsize * 8 precompiled: cachetools.LRUCache = cachetools.LRUCache(maxsize=32) +cuda.jit(device=True)(pack_return) + @annotate("NUMBA JIT", color="green", domain="cudf_python") def compile_masked_udf(func, dtypes): """ - Generate an inlineable PTX function that will be injected into - a variadic kernel inside libcudf - - assume all input types are `MaskedType(input_col.dtype)` and then - compile the requestied PTX function as a function over those types + Compile a UDF with a signature of `MaskedType`s. Assumes a + signature of `MaskedType(dtype)` for each dtype in `dtypes`. + The UDFs logic (read from `func`s bytecode) is combined with + the typing logic in `typing.py` to determine the UDFs output + dtype and compile a string containing a PTX version of the + the function. """ to_compiler_sig = tuple( MaskedType(arg) @@ -73,7 +73,7 @@ def masked_arrty_from_np_type(dtype): Return a type representing a tuple of arrays, the first element an array of the numba type corresponding to `dtype`, and the second an - array of bools represe + array of bools representing a mask. """ nb_scalar_ty = numpy_support.from_dtype(dtype) return Tuple((nb_scalar_ty[::1], libcudf_bitmask_type[::1])) @@ -102,7 +102,7 @@ def construct_signature(df, return_type): @cuda.jit(device=True) def mask_get(mask, pos): - return (mask[pos // mask_bitsize] >> (pos % mask_bitsize)) & 1 + return (mask[pos // MASK_BITSIZE] >> (pos % MASK_BITSIZE)) & 1 kernel_template = """\ @@ -112,8 +112,9 @@ def _kernel(retval, {input_columns}, {input_offsets}, size): if i < size: {masked_input_initializers} ret = {user_udf_call} - ret_data_arr[i] = {ret_value} - ret_mask_arr[i] = {ret_valid} + ret_masked = pack_return(ret) + ret_data_arr[i] = ret_masked.value + ret_mask_arr[i] = ret_masked.valid """ unmasked_input_initializer_template = """\ @@ -127,17 +128,16 @@ def _kernel(retval, {input_columns}, {input_offsets}, size): """ -@annotate("DEFINE", color="yellow", domain="cudf_python") def _define_function(df, scalar_return=False): # Create argument list for kernel input_columns = ", ".join( [f"input_col_{i}" for i in range(len(df.columns))] ) - input_offsets = ", ".join([f"offset_{i}" for i in range(len(df.columns))]) + input_offsets = ", ".join([f"offset_{i}" for i in range(len(df._data))]) # Create argument list to pass to device function - args = ", ".join([f"masked_{i}" for i in range(len(df.columns))]) + args = ", ".join([f"masked_{i}" for i in range(len(df._data))]) user_udf_call = f"f_({args})" # Generate the initializers for each device function argument @@ -149,36 +149,36 @@ def _define_function(df, scalar_return=False): else: template = unmasked_input_initializer_template - initializer = template.format(**{"idx": idx}) + initializer = template.format(idx=idx) initializers.append(initializer) masked_input_initializers = "\n".join(initializers) - # Generate the code to extract the return value and mask from the device - # function's return value depending on whether it's already masked - if scalar_return: - ret_value = "ret" - ret_valid = "True" - else: - ret_value = "ret.value" - ret_valid = "ret.valid" - # Incorporate all of the above into the kernel code template d = { "input_columns": input_columns, "input_offsets": input_offsets, "masked_input_initializers": masked_input_initializers, "user_udf_call": user_udf_call, - "ret_value": ret_value, - "ret_valid": ret_valid, } return kernel_template.format(**d) -@annotate("UDF PIPELINE", color="black", domain="cudf_python") -def udf_pipeline(df, f): +@annotate("UDF COMPILATION", color="darkgreen", domain="cudf_python") +def compile_or_get(df, f): + """ + Return a compiled kernel in terms of MaskedTypes that launches a + kernel equivalent of `f` for the dtypes of `df`. The kernel uses + a thread for each row and calls `f` using that rows data / mask + to produce an output value and output valdity for each row. + + If the UDF has already been compiled for this requested dtypes, + a cached version will be returned instead of running compilation. + + """ + numba_return_type, ptx = compile_masked_udf(f, df.dtypes) _is_scalar_return = not isinstance(numba_return_type, MaskedType) scalar_return_type = ( @@ -189,45 +189,30 @@ def udf_pipeline(df, f): sig = construct_signature(df, scalar_return_type) - # Create a device version of the - f_ = cuda.jit(device=True)(f) - - lcl = {} - exec( - # Defines a kernel named "_kernel" in the lcl dict - _define_function(df, scalar_return=_is_scalar_return), - {"f_": f_, "cuda": cuda, "Masked": Masked, "mask_get": mask_get}, - lcl, - ) - # The python function definition representing the kernel - _kernel = lcl["_kernel"] - # check to see if we already compiled this function - cache_key = cudautils._make_cache_key(_kernel, sig) - cache_key = (*cache_key, ptx) - if precompiled.get(cache_key) is None: + cache_key = cudautils._make_partial_cache_key(f) + cache_key = (*cache_key, sig) + if precompiled.get(cache_key) is not None: + kernel = precompiled[cache_key] + else: + f_ = cuda.jit(device=True)(f) + + lcl = {} + exec( + # Defines a kernel named "_kernel" in the lcl dict + _define_function(df, scalar_return=_is_scalar_return), + { + "f_": f_, + "cuda": cuda, + "Masked": Masked, + "mask_get": mask_get, + "pack_return": pack_return, + }, + lcl, + ) + # The python function definition representing the kernel + _kernel = lcl["_kernel"] kernel = cuda.jit(sig)(_kernel) precompiled[cache_key] = kernel - else: - kernel = precompiled[cache_key] - # Mask and data column preallocated - ans_col = cupy.empty( - len(df), dtype=numpy_support.as_dtype(scalar_return_type) - ) - ans_mask = cudf.core.column.column_empty(len(df), dtype="bool") - launch_args = [(ans_col, ans_mask)] - offsets = [] - for col in df._data.values(): - data = col.data - mask = col.mask - if mask is None: - mask = cudf.core.buffer.Buffer() - launch_args.append((data, mask)) - offsets.append(col.offset) - launch_args += offsets - launch_args.append(len(df)) # size - kernel.forall(len(df))(*launch_args) - - result = cudf.Series(ans_col).set_mask(bools_to_mask(ans_mask)) - return result + return kernel, numpy_support.as_dtype(scalar_return_type) diff --git a/python/cudf/cudf/core/udf/typing.py b/python/cudf/cudf/core/udf/typing.py index d053f4d605c..6f7c42165cf 100644 --- a/python/cudf/cudf/core/udf/typing.py +++ b/python/cudf/cudf/core/udf/typing.py @@ -287,6 +287,31 @@ def generic(self, args, kws): return nb_signature(types.boolean, MaskedType(types.boolean)) +def pack_return(masked_or_scalar): + # Blank function to give us something for the typing and + # lowering to grab onto. Just a dummy function for us to + # call within kernels that will get replaced later by the + # lowered implementation + pass + + +@cuda_decl_registry.register_global(pack_return) +class UnpackReturnToMasked(AbstractTemplate): + """ + Turn a returned MaskedType into its value and validity + or turn a scalar into the tuple (scalar, True). + """ + + def generic(self, args, kws): + if isinstance(args[0], MaskedType): + # MaskedType(dtype, valid) -> MaskedType(dtype, valid) + return nb_signature(args[0], args[0]) + elif isinstance(args[0], (types.Number, types.Boolean)): + # scalar_type -> MaskedType(scalar_type, True) + return_type = MaskedType(args[0]) + return nb_signature(return_type, args[0]) + + for op in arith_ops + comparison_ops: # Every op shares the same typing class cuda_decl_registry.register_global(op)(MaskedScalarArithOp) diff --git a/python/cudf/cudf/utils/cudautils.py b/python/cudf/cudf/utils/cudautils.py index 4939b00b7ee..f43b512792b 100755 --- a/python/cudf/cudf/utils/cudautils.py +++ b/python/cudf/cudf/utils/cudautils.py @@ -210,16 +210,14 @@ def grouped_window_sizes_from_offset(arr, group_starts, offset): _udf_code_cache: cachetools.LRUCache = cachetools.LRUCache(maxsize=32) -def _make_cache_key(func, sig): - codebytes = func.__code__.co_code - if func.__closure__ is not None: - cvars = tuple([x.cell_contents for x in func.__closure__]) +def _make_partial_cache_key(udf): + codebytes = udf.__code__.co_code + if udf.__closure__ is not None: + cvars = tuple([x.cell_contents for x in udf.__closure__]) cvarbytes = dumps(cvars) else: cvarbytes = b"" - - key = (sig, codebytes, cvarbytes) - return key + return codebytes, cvarbytes def compile_udf(udf, type_signature): @@ -256,14 +254,8 @@ def compile_udf(udf, type_signature): # Check if we've already compiled a similar (but possibly distinct) # function before - codebytes = udf.__code__.co_code - if udf.__closure__ is not None: - cvars = tuple([x.cell_contents for x in udf.__closure__]) - cvarbytes = dumps(cvars) - else: - cvarbytes = b"" - - key = (type_signature, codebytes, cvarbytes) + partial_key = _make_partial_cache_key(udf) + key = (type_signature, *partial_key) res = _udf_code_cache.get(key) if res: return res From 05adec79aeb2a000e9df8461c85e17cb8934233b Mon Sep 17 00:00:00 2001 From: brandon-b-miller Date: Mon, 13 Sep 2021 07:48:49 -0700 Subject: [PATCH 23/34] cleanup --- python/cudf/cudf/tests/test_udf_masked_ops.py | 8 -------- 1 file changed, 8 deletions(-) diff --git a/python/cudf/cudf/tests/test_udf_masked_ops.py b/python/cudf/cudf/tests/test_udf_masked_ops.py index 1410a24ac7a..d5fee232666 100644 --- a/python/cudf/cudf/tests/test_udf_masked_ops.py +++ b/python/cudf/cudf/tests/test_udf_masked_ops.py @@ -77,14 +77,6 @@ def func_gdf(x, y): run_masked_udf_test(func_pdf, func_gdf, gdf, check_dtype=False) -test_arith_masked_vs_constant_skip_cases = { - (False, operator.truediv), - (False, operator.floordiv), - (False, operator.mod), - (False, operator.pow), -} - - @pytest.mark.parametrize("op", arith_ops) @pytest.mark.parametrize("constant", [1, 1.5, True, False]) @pytest.mark.parametrize("data", [[1, 2, cudf.NA]]) From edbae6c8db786492a36b6f3deef73b1921c64a8f Mon Sep 17 00:00:00 2001 From: brandon-b-miller Date: Mon, 13 Sep 2021 08:13:31 -0700 Subject: [PATCH 24/34] minor updtes --- python/cudf/cudf/core/udf/pipeline.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/cudf/cudf/core/udf/pipeline.py b/python/cudf/cudf/core/udf/pipeline.py index 12afd07a494..a24e48d64fd 100644 --- a/python/cudf/cudf/core/udf/pipeline.py +++ b/python/cudf/cudf/core/udf/pipeline.py @@ -38,7 +38,7 @@ def compile_masked_udf(func, dtypes): else: numba_output_type = output_type - return numba_output_type, ptx + return numba_output_type def nulludf(func): @@ -179,7 +179,7 @@ def compile_or_get(df, f): """ - numba_return_type, ptx = compile_masked_udf(f, df.dtypes) + numba_return_type = compile_masked_udf(f, df.dtypes) _is_scalar_return = not isinstance(numba_return_type, MaskedType) scalar_return_type = ( numba_return_type From e224bee060f92107d82a5e5fd1c47f95c623f202 Mon Sep 17 00:00:00 2001 From: brandon-b-miller <53796099+brandon-b-miller@users.noreply.github.com> Date: Tue, 14 Sep 2021 09:48:34 -0500 Subject: [PATCH 25/34] Apply suggestions from code review Co-authored-by: Graham Markall <535640+gmarkall@users.noreply.github.com> --- python/cudf/cudf/core/udf/lowering.py | 11 +++-------- python/cudf/cudf/core/udf/pipeline.py | 12 +++++------- 2 files changed, 8 insertions(+), 15 deletions(-) diff --git a/python/cudf/cudf/core/udf/lowering.py b/python/cudf/cudf/core/udf/lowering.py index 39e342befb8..f8331aa9153 100644 --- a/python/cudf/cudf/core/udf/lowering.py +++ b/python/cudf/cudf/core/udf/lowering.py @@ -198,12 +198,11 @@ def masked_scalar_is_null_impl(context, builder, sig, args): # else packs it up into a new one that is valid from the get go @cuda_lower(pack_return, MaskedType) def pack_return_masked_impl(context, builder, sig, args): - outdata = cgutils.create_struct_proxy(sig.return_type)( - context, builder, value=args[0] - ) - return outdata._getvalue() + return args[0] +@cuda_lower(pack_return, types.Boolean) +@cuda_lower(pack_return, types.Number) def pack_return_scalar_impl(context, builder, sig, args): outdata = cgutils.create_struct_proxy(sig.return_type)(context, builder) outdata.value = args[0] @@ -212,10 +211,6 @@ def pack_return_scalar_impl(context, builder, sig, args): return outdata._getvalue() -cuda_lower(pack_return, types.Number)(pack_return_scalar_impl) -cuda_lower(pack_return, types.Boolean)(pack_return_scalar_impl) - - @cuda_lower(operator.truth, MaskedType) def masked_scalar_truth_impl(context, builder, sig, args): indata = cgutils.create_struct_proxy(MaskedType(types.boolean))( diff --git a/python/cudf/cudf/core/udf/pipeline.py b/python/cudf/cudf/core/udf/pipeline.py index a24e48d64fd..e00c09343d0 100644 --- a/python/cudf/cudf/core/udf/pipeline.py +++ b/python/cudf/cudf/core/udf/pipeline.py @@ -19,18 +19,16 @@ @annotate("NUMBA JIT", color="green", domain="cudf_python") def compile_masked_udf(func, dtypes): """ - Compile a UDF with a signature of `MaskedType`s. Assumes a - signature of `MaskedType(dtype)` for each dtype in `dtypes`. - The UDFs logic (read from `func`s bytecode) is combined with - the typing logic in `typing.py` to determine the UDFs output - dtype and compile a string containing a PTX version of the - the function. + Get the return type of a masked UDF for a given set of argument dtypes. It + is assumed that a `MaskedType(dtype)` is passed to the function for each + input dtype. """ to_compiler_sig = tuple( MaskedType(arg) for arg in (numpy_support.from_dtype(np_type) for np_type in dtypes) ) - # Get the inlineable PTX function + # Get the return type. The PTX is also returned by compile_udf, but is not + # needed here. ptx, output_type = cudautils.compile_udf(func, to_compiler_sig) if not isinstance(output_type, MaskedType): From a446b75c889fef9b95266169a6e7e428e5f0094b Mon Sep 17 00:00:00 2001 From: brandon-b-miller Date: Tue, 14 Sep 2021 08:39:07 -0700 Subject: [PATCH 26/34] address reviews --- .../cudf/cudf/core/udf/{classes.py => api.py} | 8 +++++ python/cudf/cudf/core/udf/lowering.py | 12 +++---- python/cudf/cudf/core/udf/pipeline.py | 36 ++++++++++--------- python/cudf/cudf/core/udf/typing.py | 28 ++++++--------- python/cudf/cudf/utils/cudautils.py | 7 ++-- 5 files changed, 46 insertions(+), 45 deletions(-) rename python/cudf/cudf/core/udf/{classes.py => api.py} (69%) diff --git a/python/cudf/cudf/core/udf/classes.py b/python/cudf/cudf/core/udf/api.py similarity index 69% rename from python/cudf/cudf/core/udf/classes.py rename to python/cudf/cudf/core/udf/api.py index fe2fbd9daad..23b4d02c57d 100644 --- a/python/cudf/cudf/core/udf/classes.py +++ b/python/cudf/cudf/core/udf/api.py @@ -14,3 +14,11 @@ class Masked: def __init__(self, value, valid): self.value = value self.valid = valid + + +def pack_return(masked_or_scalar): + # Blank function to give us something for the typing and + # lowering to grab onto. Just a dummy function for us to + # call within kernels that will get replaced later by the + # lowered implementation + pass diff --git a/python/cudf/cudf/core/udf/lowering.py b/python/cudf/cudf/core/udf/lowering.py index f8331aa9153..3c0dfbb300a 100644 --- a/python/cudf/cudf/core/udf/lowering.py +++ b/python/cudf/cudf/core/udf/lowering.py @@ -9,9 +9,9 @@ ) from numba.extending import lower_builtin, types -from cudf.core.udf.typing import MaskedType, NAType, pack_return +from cudf.core.udf.typing import MaskedType, NAType -from . import classes +from . import api from ._ops import arith_ops, comparison_ops @@ -196,13 +196,13 @@ def masked_scalar_is_null_impl(context, builder, sig, args): # Main kernel always calls `pack_return` on whatever the user defined # function returned. This returns the same data if its already a `Masked` # else packs it up into a new one that is valid from the get go -@cuda_lower(pack_return, MaskedType) +@cuda_lower(api.pack_return, MaskedType) def pack_return_masked_impl(context, builder, sig, args): return args[0] -@cuda_lower(pack_return, types.Boolean) -@cuda_lower(pack_return, types.Number) +@cuda_lower(api.pack_return, types.Boolean) +@cuda_lower(api.pack_return, types.Number) def pack_return_scalar_impl(context, builder, sig, args): outdata = cgutils.create_struct_proxy(sig.return_type)(context, builder) outdata.value = args[0] @@ -270,7 +270,7 @@ def cast_masked_to_masked(context, builder, fromty, toty, val): # Masked constructor for use in a kernel for testing -@lower_builtin(classes.Masked, types.Number, types.boolean) +@lower_builtin(api.Masked, types.Number, types.boolean) def masked_constructor(context, builder, sig, args): ty = sig.return_type value, valid = args diff --git a/python/cudf/cudf/core/udf/pipeline.py b/python/cudf/cudf/core/udf/pipeline.py index e00c09343d0..699e3834696 100644 --- a/python/cudf/cudf/core/udf/pipeline.py +++ b/python/cudf/cudf/core/udf/pipeline.py @@ -5,8 +5,8 @@ from numba.types import Tuple, boolean, int64, void from nvtx import annotate -from cudf.core.udf.classes import Masked -from cudf.core.udf.typing import MaskedType, pack_return +from cudf.core.udf.api import Masked, pack_return +from cudf.core.udf.typing import MaskedType from cudf.utils import cudautils libcudf_bitmask_type = numpy_support.from_dtype(np.dtype("int32")) @@ -17,7 +17,7 @@ @annotate("NUMBA JIT", color="green", domain="cudf_python") -def compile_masked_udf(func, dtypes): +def get_udf_return_type(func, dtypes): """ Get the return type of a masked UDF for a given set of argument dtypes. It is assumed that a `MaskedType(dtype)` is passed to the function for each @@ -177,22 +177,24 @@ def compile_or_get(df, f): """ - numba_return_type = compile_masked_udf(f, df.dtypes) - _is_scalar_return = not isinstance(numba_return_type, MaskedType) - scalar_return_type = ( - numba_return_type - if _is_scalar_return - else numba_return_type.value_type - ) - - sig = construct_signature(df, scalar_return_type) - # check to see if we already compiled this function - cache_key = cudautils._make_partial_cache_key(f) - cache_key = (*cache_key, sig) + cache_key = ( + *cudautils.make_cache_key(f, tuple(df.dtypes)), + *(col.mask is None for col in df._data.values()), + ) if precompiled.get(cache_key) is not None: - kernel = precompiled[cache_key] + kernel, scalar_return_type = precompiled[cache_key] else: + + numba_return_type = get_udf_return_type(f, df.dtypes) + _is_scalar_return = not isinstance(numba_return_type, MaskedType) + scalar_return_type = ( + numba_return_type + if _is_scalar_return + else numba_return_type.value_type + ) + + sig = construct_signature(df, scalar_return_type) f_ = cuda.jit(device=True)(f) lcl = {} @@ -211,6 +213,6 @@ def compile_or_get(df, f): # The python function definition representing the kernel _kernel = lcl["_kernel"] kernel = cuda.jit(sig)(_kernel) - precompiled[cache_key] = kernel + precompiled[cache_key] = (kernel, scalar_return_type) return kernel, numpy_support.as_dtype(scalar_return_type) diff --git a/python/cudf/cudf/core/udf/typing.py b/python/cudf/cudf/core/udf/typing.py index 6f7c42165cf..cfddfed6f22 100644 --- a/python/cudf/cudf/core/udf/typing.py +++ b/python/cudf/cudf/core/udf/typing.py @@ -17,7 +17,7 @@ from numba.cuda.cudadecl import registry as cuda_decl_registry from pandas._libs.missing import NAType as _NAType -from . import classes +from . import api from ._ops import arith_ops, comparison_ops @@ -101,7 +101,7 @@ def __eq__(self, other): # For typing a Masked constant value defined outside a kernel (e.g. captured in # a closure). -@typeof_impl.register(classes.Masked) +@typeof_impl.register(api.Masked) def typeof_masked(val, c): return MaskedType(typeof(val.value)) @@ -110,7 +110,7 @@ def typeof_masked(val, c): # type in a kernel. @cuda_decl_registry.register class MaskedConstructor(ConcreteTemplate): - key = classes.Masked + key = api.Masked cases = [ nb_signature(MaskedType(t), t, types.boolean) @@ -123,20 +123,20 @@ class MaskedConstructor(ConcreteTemplate): make_attribute_wrapper(MaskedType, "valid", "valid") -# Typing for `classes.Masked` +# Typing for `api.Masked` @cuda_decl_registry.register_attr class ClassesTemplate(AttributeTemplate): - key = types.Module(classes) + key = types.Module(api) def resolve_Masked(self, mod): return types.Function(MaskedConstructor) -# Registration of the global is also needed for Numba to type classes.Masked -cuda_decl_registry.register_global(classes, types.Module(classes)) -# For typing bare Masked (as in `from .classes import Masked` +# Registration of the global is also needed for Numba to type api.Masked +cuda_decl_registry.register_global(api, types.Module(api)) +# For typing bare Masked (as in `from .api import Masked` cuda_decl_registry.register_global( - classes.Masked, types.Function(MaskedConstructor) + api.Masked, types.Function(MaskedConstructor) ) @@ -287,15 +287,7 @@ def generic(self, args, kws): return nb_signature(types.boolean, MaskedType(types.boolean)) -def pack_return(masked_or_scalar): - # Blank function to give us something for the typing and - # lowering to grab onto. Just a dummy function for us to - # call within kernels that will get replaced later by the - # lowered implementation - pass - - -@cuda_decl_registry.register_global(pack_return) +@cuda_decl_registry.register_global(api.pack_return) class UnpackReturnToMasked(AbstractTemplate): """ Turn a returned MaskedType into its value and validity diff --git a/python/cudf/cudf/utils/cudautils.py b/python/cudf/cudf/utils/cudautils.py index f43b512792b..7b7fe674210 100755 --- a/python/cudf/cudf/utils/cudautils.py +++ b/python/cudf/cudf/utils/cudautils.py @@ -210,14 +210,14 @@ def grouped_window_sizes_from_offset(arr, group_starts, offset): _udf_code_cache: cachetools.LRUCache = cachetools.LRUCache(maxsize=32) -def _make_partial_cache_key(udf): +def make_cache_key(udf, sig): codebytes = udf.__code__.co_code if udf.__closure__ is not None: cvars = tuple([x.cell_contents for x in udf.__closure__]) cvarbytes = dumps(cvars) else: cvarbytes = b"" - return codebytes, cvarbytes + return codebytes, cvarbytes, sig def compile_udf(udf, type_signature): @@ -254,8 +254,7 @@ def compile_udf(udf, type_signature): # Check if we've already compiled a similar (but possibly distinct) # function before - partial_key = _make_partial_cache_key(udf) - key = (type_signature, *partial_key) + key = make_cache_key(udf, type_signature) res = _udf_code_cache.get(key) if res: return res From b54e11ed51853b2879e96cb78dcd4cf780c33bdd Mon Sep 17 00:00:00 2001 From: brandon-b-miller Date: Tue, 14 Sep 2021 08:48:51 -0700 Subject: [PATCH 27/34] remove creating buffers if the column has no mask --- python/cudf/cudf/core/frame.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/python/cudf/cudf/core/frame.py b/python/cudf/cudf/core/frame.py index ea5e2bf3054..3bb43eff9ef 100644 --- a/python/cudf/cudf/core/frame.py +++ b/python/cudf/cudf/core/frame.py @@ -1352,8 +1352,6 @@ def _apply(self, func): for col in self._data.values(): data = col.data mask = col.mask - if mask is None: - mask = cudf.core.buffer.Buffer() launch_args.append((data, mask)) offsets.append(col.offset) launch_args += offsets From 16406ffa5c942789d50810d5323ff5a9581e334b Mon Sep 17 00:00:00 2001 From: brandon-b-miller Date: Tue, 14 Sep 2021 13:38:12 -0700 Subject: [PATCH 28/34] put buffer back in for blank mask for now --- python/cudf/cudf/core/frame.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/python/cudf/cudf/core/frame.py b/python/cudf/cudf/core/frame.py index 3bb43eff9ef..ea5e2bf3054 100644 --- a/python/cudf/cudf/core/frame.py +++ b/python/cudf/cudf/core/frame.py @@ -1352,6 +1352,8 @@ def _apply(self, func): for col in self._data.values(): data = col.data mask = col.mask + if mask is None: + mask = cudf.core.buffer.Buffer() launch_args.append((data, mask)) offsets.append(col.offset) launch_args += offsets From 30d60132fa3cc2249c4e2ed8096d78c9869f5935 Mon Sep 17 00:00:00 2001 From: brandon-b-miller Date: Fri, 17 Sep 2021 07:17:07 -0700 Subject: [PATCH 29/34] fix import bug --- python/cudf/cudf/tests/test_extension_compilation.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/cudf/cudf/tests/test_extension_compilation.py b/python/cudf/cudf/tests/test_extension_compilation.py index e527fd0af17..39fa7b11ce2 100644 --- a/python/cudf/cudf/tests/test_extension_compilation.py +++ b/python/cudf/cudf/tests/test_extension_compilation.py @@ -5,7 +5,7 @@ from numba.cuda import compile_ptx from cudf import NA -from cudf.core.udf.classes import Masked +from cudf.core.udf.api import Masked from cudf.core.udf.typing import MaskedType arith_ops = ( From a36964123adef59247b05f9d6aa75fb1807315c5 Mon Sep 17 00:00:00 2001 From: brandon-b-miller Date: Fri, 17 Sep 2021 07:31:10 -0700 Subject: [PATCH 30/34] clarify exec context --- python/cudf/cudf/core/udf/pipeline.py | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/python/cudf/cudf/core/udf/pipeline.py b/python/cudf/cudf/core/udf/pipeline.py index 699e3834696..03e16621e96 100644 --- a/python/cudf/cudf/core/udf/pipeline.py +++ b/python/cudf/cudf/core/udf/pipeline.py @@ -197,21 +197,22 @@ def compile_or_get(df, f): sig = construct_signature(df, scalar_return_type) f_ = cuda.jit(device=True)(f) - lcl = {} - exec( - # Defines a kernel named "_kernel" in the lcl dict - _define_function(df, scalar_return=_is_scalar_return), - { + # Dict of 'local' variables into which `_kernel` is defined + local_exec_context = {} + global_exec_context = { "f_": f_, "cuda": cuda, "Masked": Masked, "mask_get": mask_get, "pack_return": pack_return, - }, - lcl, + } + exec( + _define_function(df, scalar_return=_is_scalar_return), + global_exec_context, + local_exec_context, ) # The python function definition representing the kernel - _kernel = lcl["_kernel"] + _kernel = local_exec_context["_kernel"] kernel = cuda.jit(sig)(_kernel) precompiled[cache_key] = (kernel, scalar_return_type) From 3c0c76fb13edd23a2db7e5abec7fd8624ed11c1c Mon Sep 17 00:00:00 2001 From: brandon-b-miller Date: Fri, 17 Sep 2021 09:03:57 -0700 Subject: [PATCH 31/34] rework unmasked kernels slightly --- python/cudf/cudf/core/frame.py | 5 +++-- python/cudf/cudf/core/udf/pipeline.py | 25 ++++++++++++++----------- 2 files changed, 17 insertions(+), 13 deletions(-) diff --git a/python/cudf/cudf/core/frame.py b/python/cudf/cudf/core/frame.py index 1074c80d6a0..e4a9fe35c0b 100644 --- a/python/cudf/cudf/core/frame.py +++ b/python/cudf/cudf/core/frame.py @@ -1443,8 +1443,9 @@ def _apply(self, func): data = col.data mask = col.mask if mask is None: - mask = cudf.core.buffer.Buffer() - launch_args.append((data, mask)) + launch_args.append(data) + else: + launch_args.append((data, mask)) offsets.append(col.offset) launch_args += offsets launch_args.append(len(self)) # size diff --git a/python/cudf/cudf/core/udf/pipeline.py b/python/cudf/cudf/core/udf/pipeline.py index 03e16621e96..50f625937fa 100644 --- a/python/cudf/cudf/core/udf/pipeline.py +++ b/python/cudf/cudf/core/udf/pipeline.py @@ -66,15 +66,18 @@ def wrapper(*args): return wrapper -def masked_arrty_from_np_type(dtype): +def masked_array_type_from_col(col): """ Return a type representing a tuple of arrays, the first element an array of the numba type corresponding to `dtype`, and the second an array of bools representing a mask. """ - nb_scalar_ty = numpy_support.from_dtype(dtype) - return Tuple((nb_scalar_ty[::1], libcudf_bitmask_type[::1])) + nb_scalar_ty = numpy_support.from_dtype(col.dtype) + if col.mask is None: + return nb_scalar_ty[::1] + else: + return Tuple((nb_scalar_ty[::1], libcudf_bitmask_type[::1])) def construct_signature(df, return_type): @@ -89,7 +92,7 @@ def construct_signature(df, return_type): offsets = [] sig = [return_type] for col in df._data.values(): - sig.append(masked_arrty_from_np_type(col.dtype)) + sig.append(masked_array_type_from_col(col)) offsets.append(int64) # return_type + data,masks + offsets + size @@ -116,7 +119,7 @@ def _kernel(retval, {input_columns}, {input_offsets}, size): """ unmasked_input_initializer_template = """\ - d_{idx}, m_{idx} = input_col_{idx} + d_{idx} = input_col_{idx} masked_{idx} = Masked(d_{idx}[i], True) """ @@ -200,12 +203,12 @@ def compile_or_get(df, f): # Dict of 'local' variables into which `_kernel` is defined local_exec_context = {} global_exec_context = { - "f_": f_, - "cuda": cuda, - "Masked": Masked, - "mask_get": mask_get, - "pack_return": pack_return, - } + "f_": f_, + "cuda": cuda, + "Masked": Masked, + "mask_get": mask_get, + "pack_return": pack_return, + } exec( _define_function(df, scalar_return=_is_scalar_return), global_exec_context, From 6deb96a075c1fd8eef672d217b82a6a4bf43578a Mon Sep 17 00:00:00 2001 From: brandon-b-miller Date: Tue, 21 Sep 2021 10:58:07 -0700 Subject: [PATCH 32/34] un purge c++ --- .../Modules/JitifyPreprocessKernels.cmake | 1 + cpp/include/cudf/transform.hpp | 6 ++ cpp/src/transform/jit/masked_udf_kernel.cu | 85 +++++++++++++++ cpp/src/transform/transform.cpp | 102 +++++++++++++++++- python/cudf/cudf/_lib/cpp/transform.pxd | 6 ++ python/cudf/cudf/_lib/transform.pyx | 24 +++++ 6 files changed, 223 insertions(+), 1 deletion(-) create mode 100644 cpp/src/transform/jit/masked_udf_kernel.cu diff --git a/cpp/cmake/Modules/JitifyPreprocessKernels.cmake b/cpp/cmake/Modules/JitifyPreprocessKernels.cmake index eb1ade61440..7e2ec5254d3 100644 --- a/cpp/cmake/Modules/JitifyPreprocessKernels.cmake +++ b/cpp/cmake/Modules/JitifyPreprocessKernels.cmake @@ -56,6 +56,7 @@ endfunction() jit_preprocess_files(SOURCE_DIRECTORY ${CUDF_SOURCE_DIR}/src FILES binaryop/jit/kernel.cu + transform/jit/masked_udf_kernel.cu transform/jit/kernel.cu rolling/jit/kernel.cu ) diff --git a/cpp/include/cudf/transform.hpp b/cpp/include/cudf/transform.hpp index f0c69549b0a..af2858d948e 100644 --- a/cpp/include/cudf/transform.hpp +++ b/cpp/include/cudf/transform.hpp @@ -54,6 +54,12 @@ std::unique_ptr transform( bool is_ptx, rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()); +std::unique_ptr generalized_masked_op( + table_view const& data_view, + std::string const& binary_udf, + data_type output_type, + rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()); + /** * @brief Creates a null_mask from `input` by converting `NaN` to null and * preserving existing null values and also returns new null_count. diff --git a/cpp/src/transform/jit/masked_udf_kernel.cu b/cpp/src/transform/jit/masked_udf_kernel.cu new file mode 100644 index 00000000000..319ad730c53 --- /dev/null +++ b/cpp/src/transform/jit/masked_udf_kernel.cu @@ -0,0 +1,85 @@ +/* + * Copyright (c) 2021, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include + +#include +#include + +#include +#include +#include +#include +#include + +namespace cudf { +namespace transformation { +namespace jit { + +template +struct Masked { + T value; + bool valid; +}; + +template +__device__ auto make_args(cudf::size_type id, TypeIn in_ptr, MaskType in_mask, OffsetType in_offset) +{ + bool valid = in_mask ? cudf::bit_is_set(in_mask, in_offset + id) : true; + return cuda::std::make_tuple(in_ptr[id], valid); +} + +template +__device__ auto make_args(cudf::size_type id, + InType in_ptr, + MaskType in_mask, // in practice, always cudf::bitmask_type const* + OffsetType in_offset, // in practice, always cudf::size_type + Arguments... args) +{ + bool valid = in_mask ? cudf::bit_is_set(in_mask, in_offset + id) : true; + return cuda::std::tuple_cat(cuda::std::make_tuple(in_ptr[id], valid), make_args(id, args...)); +} + +template +__global__ void generic_udf_kernel(cudf::size_type size, + TypeOut* out_data, + bool* out_mask, + Arguments... args) +{ + int const tid = threadIdx.x; + int const blkid = blockIdx.x; + int const blksz = blockDim.x; + int const gridsz = gridDim.x; + int const start = tid + blkid * blksz; + int const step = blksz * gridsz; + + Masked output; + for (cudf::size_type i = start; i < size; i += step) { + auto func_args = cuda::std::tuple_cat( + cuda::std::make_tuple(&output.value), + make_args(i, args...) // passed int64*, bool*, int64, int64*, bool*, int64 + ); + cuda::std::apply(GENERIC_OP, func_args); + out_data[i] = output.value; + out_mask[i] = output.valid; + } +} + +} // namespace jit +} // namespace transformation +} // namespace cudf diff --git a/cpp/src/transform/transform.cpp b/cpp/src/transform/transform.cpp index 78bd74d1301..cf6bfa34b55 100644 --- a/cpp/src/transform/transform.cpp +++ b/cpp/src/transform/transform.cpp @@ -24,7 +24,7 @@ #include #include - +#include #include #include #include @@ -64,6 +64,80 @@ void unary_operation(mutable_column_view output, cudf::jit::get_data_ptr(input)); } +std::vector make_template_types(column_view outcol_view, table_view const& data_view) +{ + std::string mskptr_type = + cudf::jit::get_type_name(cudf::data_type(cudf::type_to_id())) + "*"; + std::string offset_type = + cudf::jit::get_type_name(cudf::data_type(cudf::type_to_id())); + + std::vector template_types; + template_types.reserve((3 * data_view.num_columns()) + 1); + + template_types.push_back(cudf::jit::get_type_name(outcol_view.type())); + for (auto const& col : data_view) { + template_types.push_back(cudf::jit::get_type_name(col.type()) + "*"); + template_types.push_back(mskptr_type); + template_types.push_back(offset_type); + } + return template_types; +} + +void generalized_operation(table_view const& data_view, + std::string const& udf, + data_type output_type, + mutable_column_view outcol_view, + mutable_column_view outmsk_view, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr) +{ + auto const template_types = make_template_types(outcol_view, data_view); + + std::string generic_kernel_name = + jitify2::reflection::Template("cudf::transformation::jit::generic_udf_kernel") + .instantiate(template_types); + + std::string generic_cuda_source = cudf::jit::parse_single_function_ptx( + udf, "GENERIC_OP", cudf::jit::get_type_name(output_type), {0}); + + std::vector kernel_args; + kernel_args.reserve((data_view.num_columns() * 3) + 3); + + cudf::size_type size = outcol_view.size(); + const void* outcol_ptr = cudf::jit::get_data_ptr(outcol_view); + const void* outmsk_ptr = cudf::jit::get_data_ptr(outmsk_view); + kernel_args.insert(kernel_args.begin(), {&size, &outcol_ptr, &outmsk_ptr}); + + std::vector data_ptrs; + std::vector mask_ptrs; + std::vector offsets; + + data_ptrs.reserve(data_view.num_columns()); + mask_ptrs.reserve(data_view.num_columns()); + offsets.reserve(data_view.num_columns()); + + auto const iters = thrust::make_zip_iterator( + thrust::make_tuple(data_ptrs.begin(), mask_ptrs.begin(), offsets.begin())); + + std::for_each(iters, iters + data_view.num_columns(), [&](auto const& tuple_vals) { + kernel_args.push_back(&thrust::get<0>(tuple_vals)); + kernel_args.push_back(&thrust::get<1>(tuple_vals)); + kernel_args.push_back(&thrust::get<2>(tuple_vals)); + }); + + std::transform(data_view.begin(), data_view.end(), iters, [&](column_view const& col) { + return thrust::make_tuple(cudf::jit::get_data_ptr(col), col.null_mask(), col.offset()); + }); + + cudf::jit::get_program_cache(*transform_jit_masked_udf_kernel_cu_jit) + .get_kernel(generic_kernel_name, + {}, + {{"transform/jit/operation-udf.hpp", generic_cuda_source}}, + {"-arch=sm_."}) + ->configure_1d_max_occupancy(0, 0, 0, stream.value()) + ->launch(kernel_args.data()); +} + } // namespace jit } // namespace transformation @@ -90,6 +164,24 @@ std::unique_ptr transform(column_view const& input, return output; } +std::unique_ptr generalized_masked_op(table_view const& data_view, + std::string const& udf, + data_type output_type, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr) +{ + std::unique_ptr output = make_fixed_width_column(output_type, data_view.num_rows()); + std::unique_ptr output_mask = + make_fixed_width_column(cudf::data_type{cudf::type_id::BOOL8}, data_view.num_rows()); + + transformation::jit::generalized_operation( + data_view, udf, output_type, *output, *output_mask, stream, mr); + + auto final_output_mask = cudf::bools_to_mask(*output_mask); + output.get()->set_null_mask(std::move(*(final_output_mask.first))); + return output; +} + } // namespace detail std::unique_ptr transform(column_view const& input, @@ -102,4 +194,12 @@ std::unique_ptr transform(column_view const& input, return detail::transform(input, unary_udf, output_type, is_ptx, rmm::cuda_stream_default, mr); } +std::unique_ptr generalized_masked_op(table_view const& data_view, + std::string const& udf, + data_type output_type, + rmm::mr::device_memory_resource* mr) +{ + return detail::generalized_masked_op(data_view, udf, output_type, rmm::cuda_stream_default, mr); +} + } // namespace cudf diff --git a/python/cudf/cudf/_lib/cpp/transform.pxd b/python/cudf/cudf/_lib/cpp/transform.pxd index 484e3997f34..907a85ed593 100644 --- a/python/cudf/cudf/_lib/cpp/transform.pxd +++ b/python/cudf/cudf/_lib/cpp/transform.pxd @@ -34,6 +34,12 @@ cdef extern from "cudf/transform.hpp" namespace "cudf" nogil: bool is_ptx ) except + + cdef unique_ptr[column] generalized_masked_op( + const table_view& data_view, + string udf, + data_type output_type, + ) except + + cdef pair[unique_ptr[table], unique_ptr[column]] encode( table_view input ) except + diff --git a/python/cudf/cudf/_lib/transform.pyx b/python/cudf/cudf/_lib/transform.pyx index acb7c168a9c..351d185e81a 100644 --- a/python/cudf/cudf/_lib/transform.pyx +++ b/python/cudf/cudf/_lib/transform.pyx @@ -126,6 +126,30 @@ def transform(Column input, op): return Column.from_unique_ptr(move(c_output)) +def masked_udf(Table incols, op, output_type): + cdef table_view data_view = table_view_from_table( + incols, ignore_index=True) + cdef string c_str = op.encode("UTF-8") + cdef type_id c_tid + cdef data_type c_dtype + + c_tid = ( + SUPPORTED_NUMPY_TO_LIBCUDF_TYPES[ + output_type + ] + ) + c_dtype = data_type(c_tid) + + with nogil: + c_output = move(libcudf_transform.generalized_masked_op( + data_view, + c_str, + c_dtype, + )) + + return Column.from_unique_ptr(move(c_output)) + + def table_encode(Table input): cdef table_view c_input = table_view_from_table( input, ignore_index=True) From 51b4fc95289125835af60e0c9addbe2ac162611a Mon Sep 17 00:00:00 2001 From: brandon-b-miller Date: Tue, 21 Sep 2021 10:59:39 -0700 Subject: [PATCH 33/34] cpp cleanup --- cpp/src/transform/transform.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/cpp/src/transform/transform.cpp b/cpp/src/transform/transform.cpp index cf6bfa34b55..5230b853a79 100644 --- a/cpp/src/transform/transform.cpp +++ b/cpp/src/transform/transform.cpp @@ -25,6 +25,7 @@ #include #include + #include #include #include From 71c71b83d5d24939b752bc662b77f772a894f1b8 Mon Sep 17 00:00:00 2001 From: brandon-b-miller Date: Tue, 28 Sep 2021 08:59:59 -0700 Subject: [PATCH 34/34] address reviews --- python/cudf/cudf/core/udf/lowering.py | 5 +- python/cudf/cudf/core/udf/pipeline.py | 69 +++++++++---------- python/cudf/cudf/core/udf/typing.py | 4 +- python/cudf/cudf/tests/test_udf_masked_ops.py | 34 ++++----- 4 files changed, 51 insertions(+), 61 deletions(-) diff --git a/python/cudf/cudf/core/udf/lowering.py b/python/cudf/cudf/core/udf/lowering.py index 3c0dfbb300a..3986abc2bf0 100644 --- a/python/cudf/cudf/core/udf/lowering.py +++ b/python/cudf/cudf/core/udf/lowering.py @@ -9,11 +9,10 @@ ) from numba.extending import lower_builtin, types +from cudf.core.udf import api +from cudf.core.udf._ops import arith_ops, comparison_ops from cudf.core.udf.typing import MaskedType, NAType -from . import api -from ._ops import arith_ops, comparison_ops - @cuda_lowering_registry.lower_constant(NAType) def constant_na(context, builder, ty, pyval): diff --git a/python/cudf/cudf/core/udf/pipeline.py b/python/cudf/cudf/core/udf/pipeline.py index 50f625937fa..a410a03be41 100644 --- a/python/cudf/cudf/core/udf/pipeline.py +++ b/python/cudf/cudf/core/udf/pipeline.py @@ -13,8 +13,6 @@ MASK_BITSIZE = np.dtype("int32").itemsize * 8 precompiled: cachetools.LRUCache = cachetools.LRUCache(maxsize=32) -cuda.jit(device=True)(pack_return) - @annotate("NUMBA JIT", color="green", domain="cudf_python") def get_udf_return_type(func, dtypes): @@ -131,9 +129,7 @@ def _kernel(retval, {input_columns}, {input_offsets}, size): def _define_function(df, scalar_return=False): # Create argument list for kernel - input_columns = ", ".join( - [f"input_col_{i}" for i in range(len(df.columns))] - ) + input_columns = ", ".join([f"input_col_{i}" for i in range(len(df._data))]) input_offsets = ", ".join([f"offset_{i}" for i in range(len(df._data))]) @@ -187,36 +183,37 @@ def compile_or_get(df, f): ) if precompiled.get(cache_key) is not None: kernel, scalar_return_type = precompiled[cache_key] - else: - - numba_return_type = get_udf_return_type(f, df.dtypes) - _is_scalar_return = not isinstance(numba_return_type, MaskedType) - scalar_return_type = ( - numba_return_type - if _is_scalar_return - else numba_return_type.value_type - ) + return kernel, scalar_return_type + + numba_return_type = get_udf_return_type(f, df.dtypes) + _is_scalar_return = not isinstance(numba_return_type, MaskedType) + scalar_return_type = ( + numba_return_type + if _is_scalar_return + else numba_return_type.value_type + ) - sig = construct_signature(df, scalar_return_type) - f_ = cuda.jit(device=True)(f) - - # Dict of 'local' variables into which `_kernel` is defined - local_exec_context = {} - global_exec_context = { - "f_": f_, - "cuda": cuda, - "Masked": Masked, - "mask_get": mask_get, - "pack_return": pack_return, - } - exec( - _define_function(df, scalar_return=_is_scalar_return), - global_exec_context, - local_exec_context, - ) - # The python function definition representing the kernel - _kernel = local_exec_context["_kernel"] - kernel = cuda.jit(sig)(_kernel) - precompiled[cache_key] = (kernel, scalar_return_type) + sig = construct_signature(df, scalar_return_type) + f_ = cuda.jit(device=True)(f) + + # Dict of 'local' variables into which `_kernel` is defined + local_exec_context = {} + global_exec_context = { + "f_": f_, + "cuda": cuda, + "Masked": Masked, + "mask_get": mask_get, + "pack_return": pack_return, + } + exec( + _define_function(df, scalar_return=_is_scalar_return), + global_exec_context, + local_exec_context, + ) + # The python function definition representing the kernel + _kernel = local_exec_context["_kernel"] + kernel = cuda.jit(sig)(_kernel) + scalar_return_type = numpy_support.as_dtype(scalar_return_type) + precompiled[cache_key] = (kernel, scalar_return_type) - return kernel, numpy_support.as_dtype(scalar_return_type) + return kernel, scalar_return_type diff --git a/python/cudf/cudf/core/udf/typing.py b/python/cudf/cudf/core/udf/typing.py index cfddfed6f22..042d97db838 100644 --- a/python/cudf/cudf/core/udf/typing.py +++ b/python/cudf/cudf/core/udf/typing.py @@ -17,8 +17,8 @@ from numba.cuda.cudadecl import registry as cuda_decl_registry from pandas._libs.missing import NAType as _NAType -from . import api -from ._ops import arith_ops, comparison_ops +from cudf.core.udf import api +from cudf.core.udf._ops import arith_ops, comparison_ops class MaskedType(types.Type): diff --git a/python/cudf/cudf/tests/test_udf_masked_ops.py b/python/cudf/cudf/tests/test_udf_masked_ops.py index d5fee232666..9a48a98fbc2 100644 --- a/python/cudf/cudf/tests/test_udf_masked_ops.py +++ b/python/cudf/cudf/tests/test_udf_masked_ops.py @@ -96,15 +96,13 @@ def func_gdf(x): operator.truediv, operator.floordiv, }: - with pytest.xfail(): - # The following tests cases yield undefined behavior: - # - truediv(x, False) because its dividing by zero - # - floordiv(x, False) because its dividing by zero - # - mod(x, False) because its mod by zero, - # - pow(x, False) because we have an NA in the series and pandas - # insists that (NA**0 == 1) where we do not - run_masked_udf_test(func_pdf, func_gdf, gdf, check_dtype=False) - return + # The following tests cases yield undefined behavior: + # - truediv(x, False) because its dividing by zero + # - floordiv(x, False) because its dividing by zero + # - mod(x, False) because its mod by zero, + # - pow(x, False) because we have an NA in the series and pandas + # insists that (NA**0 == 1) where we do not + pytest.skip() run_masked_udf_test(func_pdf, func_gdf, gdf, check_dtype=False) @@ -123,13 +121,11 @@ def func_gdf(x): gdf = cudf.DataFrame({"data": data}) if constant == 1 and op is operator.pow: - with pytest.xfail(): - # The following tests cases yield differing results from pandas: - # - 1**NA - # - True**NA - # both due to pandas insisting that this is equal to 1. - run_masked_udf_test(func_pdf, func_gdf, gdf, check_dtype=False) - return + # The following tests cases yield differing results from pandas: + # - 1**NA + # - True**NA + # both due to pandas insisting that this is equal to 1. + pytest.skip() run_masked_udf_test(func_pdf, func_gdf, gdf, check_dtype=False) @@ -146,10 +142,8 @@ def func_gdf(x): gdf = cudf.DataFrame({"data": data}) if 1 in gdf["data"] and op is operator.pow: - with pytest.xfail(): - # In pandas, 1**NA == 1. - run_masked_udf_test(func_pdf, func_gdf, gdf, check_dtype=False) - return + # In pandas, 1**NA == 1. + pytest.skip() run_masked_udf_test(func_pdf, func_gdf, gdf, check_dtype=False)