diff --git a/cpp/src/groupby/sort/group_tdigest.cu b/cpp/src/groupby/sort/group_tdigest.cu index d2bc0f42bda..146a6a8c31c 100644 --- a/cpp/src/groupby/sort/group_tdigest.cu +++ b/cpp/src/groupby/sort/group_tdigest.cu @@ -57,7 +57,10 @@ struct make_centroid { centroid operator() __device__(size_type index) { - return {static_cast(col.element(index)), 1, col.is_valid(index)}; + auto const is_valid = col.is_valid(index); + auto const mean = is_valid ? static_cast(col.element(index)) : 0.0; + auto const weight = is_valid ? 1.0 : 0.0; + return {mean, weight, is_valid}; } }; @@ -237,6 +240,7 @@ __device__ double scale_func_k1(double quantile, double delta_norm) * @param group_cluster_wl Output. The set of cluster weight limits for each group. * @param group_num_clusters Output. The number of output clusters for each input group. * @param group_cluster_offsets Offsets per-group to the start of it's clusters + * @param has_nulls Whether or not the input contains nulls * */ template @@ -247,24 +251,33 @@ __global__ void generate_cluster_limits_kernel(int delta_, CumulativeWeight cumulative_weight, double* group_cluster_wl, size_type* group_num_clusters, - offset_type const* group_cluster_offsets) + offset_type const* group_cluster_offsets, + bool has_nulls) { int const tid = threadIdx.x + blockIdx.x * blockDim.x; auto const group_index = tid; if (group_index >= num_groups) { return; } // we will generate at most delta clusters. - double const delta = static_cast(delta_); - double const delta_norm = delta / (2.0 * M_PI); - double const total_weight = total_weight_[group_index]; - group_num_clusters[group_index] = 0; - // a group with nothing in it. - if (total_weight <= 0) { return; } + double const delta = static_cast(delta_); + double const delta_norm = delta / (2.0 * M_PI); + double const total_weight = total_weight_[group_index]; // start at the correct place based on our cluster offset. double* cluster_wl = group_cluster_wl ? group_cluster_wl + group_cluster_offsets[group_index] : nullptr; + // a group with nothing in it. + group_num_clusters[group_index] = 0; + if (total_weight <= 0) { + // if the input contains nulls we can potentially have a group that generates no + // clusters because -all- of the input values are null. in that case, the reduce_by_key call + // in the tdigest generation step will need a location to store the unused reduction value for + // that group of nulls. these "stubs" will be postprocessed out afterwards. + if (has_nulls) { group_num_clusters[group_index] = 1; } + return; + } + double cur_limit = 0.0; double cur_weight = 0.0; double next_limit = -1.0; @@ -349,6 +362,7 @@ __global__ void generate_cluster_limits_kernel(int delta_, * stream that falls before our current cluster limit * @param total_weight A functor which returns the expected total weight for * the entire stream of input values for the specified group. + * @param has_nulls Whether or not the input data contains nulls * @param stream CUDA stream used for device memory operations and kernel launches. * @param mr Device memory resource used to allocate the returned column's device memory * @@ -362,6 +376,7 @@ generate_group_cluster_info(int delta, NearestWeight nearest_weight, TotalWeightIter total_weight, CumulativeWeight cumulative_weight, + bool has_nulls, rmm::cuda_stream_view stream, rmm::mr::device_memory_resource* mr) { @@ -379,10 +394,11 @@ generate_group_cluster_info(int delta, cumulative_weight, nullptr, group_num_clusters.begin(), - nullptr); + nullptr, + has_nulls); // generate group cluster offsets (where the clusters for a given group start and end) - auto group_cluster_offsets = cudf::make_fixed_width_column( + auto group_cluster_offsets = cudf::make_numeric_column( data_type{type_id::INT32}, num_groups + 1, mask_state::UNALLOCATED, stream, mr); auto cluster_size = cudf::detail::make_counting_transform_iterator( 0, [group_num_clusters = group_num_clusters.begin(), num_groups] __device__(size_type index) { @@ -408,13 +424,96 @@ generate_group_cluster_info(int delta, cumulative_weight, group_cluster_wl.begin(), group_num_clusters.begin(), - group_cluster_offsets->view().begin()); + group_cluster_offsets->view().begin(), + has_nulls); return {std::move(group_cluster_wl), std::move(group_cluster_offsets), static_cast(total_clusters)}; } +std::unique_ptr build_output_column(size_type num_rows, + std::unique_ptr&& means, + std::unique_ptr&& weights, + std::unique_ptr&& offsets, + std::unique_ptr&& min_col, + std::unique_ptr&& max_col, + bool has_nulls, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr) +{ + // whether or not this weight is a stub + auto is_stub_weight = [weights = weights->view().begin()] __device__(size_type i) { + return weights[i] == 0; + }; + // whether or not this particular tdigest is a stub + auto is_stub_digest = [offsets = offsets->view().begin(), is_stub_weight] __device__( + size_type i) { return is_stub_weight(offsets[i]) ? 1 : 0; }; + + size_type const num_stubs = [&]() { + if (!has_nulls) { return 0; } + auto iter = cudf::detail::make_counting_transform_iterator(0, is_stub_digest); + return thrust::reduce(rmm::exec_policy(stream), iter, iter + num_rows); + }(); + + // if there are no stub tdigests, we can return immediately. + if (num_stubs == 0) { + return cudf::detail::tdigest::make_tdigest_column(num_rows, + std::move(means), + std::move(weights), + std::move(offsets), + std::move(min_col), + std::move(max_col), + stream, + mr); + } + + // otherwise we need to strip out the stubs. + auto remove_stubs = [&](column_view const& col, size_type num_stubs) { + auto result = cudf::make_numeric_column( + data_type{type_id::FLOAT64}, col.size() - num_stubs, mask_state::UNALLOCATED, stream, mr); + thrust::remove_copy_if(rmm::exec_policy(stream), + col.begin(), + col.end(), + thrust::make_counting_iterator(0), + result->mutable_view().begin(), + is_stub_weight); + return result; + }; + // remove from the means and weights column + auto _means = remove_stubs(*means, num_stubs); + auto _weights = remove_stubs(*weights, num_stubs); + + // adjust offsets. + rmm::device_uvector sizes(num_rows, stream); + thrust::transform(rmm::exec_policy(stream), + thrust::make_counting_iterator(0), + thrust::make_counting_iterator(0) + num_rows, + sizes.begin(), + [offsets = offsets->view().begin()] __device__(size_type i) { + return offsets[i + 1] - offsets[i]; + }); + auto iter = cudf::detail::make_counting_transform_iterator( + 0, [sizes = sizes.begin(), is_stub_digest, num_rows] __device__(size_type i) { + return i == num_rows || is_stub_digest(i) ? 0 : sizes[i]; + }); + thrust::exclusive_scan(rmm::exec_policy(stream), + iter, + iter + num_rows + 1, + offsets->mutable_view().begin(), + 0); + + // assemble final column + return cudf::detail::tdigest::make_tdigest_column(num_rows, + std::move(_means), + std::move(_weights), + std::move(offsets), + std::move(min_col), + std::move(max_col), + stream, + mr); +} + /** * @brief Compute a column of tdigests. * @@ -436,6 +535,7 @@ generate_group_cluster_info(int delta, * @param group_cluster_wl Cluster weight limits for each group. * @param group_cluster_offsets R-value reference of offsets into the cluster weight limits. * @param total_clusters Total number of clusters in all groups. + * @param has_nulls Whether or not the input contains nulls * @param stream CUDA stream used for device memory operations and kernel launches. * @param mr Device memory resource used to allocate the returned column's device memory * @@ -451,10 +551,11 @@ std::unique_ptr compute_tdigests(int delta, rmm::device_uvector const& group_cluster_wl, std::unique_ptr&& group_cluster_offsets, size_type total_clusters, + bool has_nulls, rmm::cuda_stream_view stream, rmm::mr::device_memory_resource* mr) { - // the output for each group is column of data that represents the tdigest. since we want 1 row + // the output for each group is a column of data that represents the tdigest. since we want 1 row // per group, each row will be a list the length of the tdigest for that group. so our output // column is of the form: // struct { @@ -506,13 +607,15 @@ std::unique_ptr compute_tdigests(int delta, }); // mean and weight data - auto centroid_means = cudf::make_fixed_width_column( + auto centroid_means = cudf::make_numeric_column( data_type{type_id::FLOAT64}, total_clusters, mask_state::UNALLOCATED, stream, mr); - auto centroid_weights = cudf::make_fixed_width_column( + auto centroid_weights = cudf::make_numeric_column( data_type{type_id::FLOAT64}, total_clusters, mask_state::UNALLOCATED, stream, mr); // reduce the centroids down by key. cudf::mutable_column_view mean_col(*centroid_means); cudf::mutable_column_view weight_col(*centroid_weights); + + // reduce the centroids into the clusters auto output = thrust::make_zip_iterator(thrust::make_tuple( mean_col.begin(), weight_col.begin(), thrust::make_discard_iterator())); auto const num_values = std::distance(centroids_begin, centroids_end); @@ -526,15 +629,15 @@ std::unique_ptr compute_tdigests(int delta, merge_centroids{}); // create final tdigest column - auto const num_output_rows = group_cluster_offsets->size() - 1; - return cudf::detail::tdigest::make_tdigest_column(num_output_rows, - std::move(centroid_means), - std::move(centroid_weights), - std::move(group_cluster_offsets), - std::move(min_col), - std::move(max_col), - stream, - mr); + return build_output_column(group_cluster_offsets->size() - 1, + std::move(centroid_means), + std::move(centroid_weights), + std::move(group_cluster_offsets), + std::move(min_col), + std::move(max_col), + has_nulls, + stream, + mr); } // retrieve total weight of scalar inputs by group index @@ -583,6 +686,7 @@ struct typed_group_tdigest { nearest_value_scalar_weights{}, total_weight, cumulative_scalar_weight{group_offsets, group_labels}, + col.null_count() > 0, stream, mr); @@ -591,9 +695,9 @@ struct typed_group_tdigest { auto d_col = cudf::column_device_view::create(col); // compute min and max columns - auto min_col = cudf::make_fixed_width_column( + auto min_col = cudf::make_numeric_column( data_type{type_id::FLOAT64}, num_groups, mask_state::UNALLOCATED, stream, mr); - auto max_col = cudf::make_fixed_width_column( + auto max_col = cudf::make_numeric_column( data_type{type_id::FLOAT64}, num_groups, mask_state::UNALLOCATED, stream, mr); thrust::transform( rmm::exec_policy(stream), @@ -617,6 +721,7 @@ struct typed_group_tdigest { group_cluster_wl, std::move(group_cluster_offsets), total_clusters, + col.null_count() > 0, stream, mr); } @@ -734,7 +839,7 @@ std::unique_ptr group_merge_tdigest(column_view const& input, }); // generate min and max values - auto merged_min_col = cudf::make_fixed_width_column( + auto merged_min_col = cudf::make_numeric_column( data_type{type_id::FLOAT64}, num_groups, mask_state::UNALLOCATED, stream, mr); auto min_iter = thrust::make_transform_iterator( thrust::make_zip_iterator(thrust::make_tuple(tdv.min_begin(), tdv.size_begin())), @@ -748,7 +853,7 @@ std::unique_ptr group_merge_tdigest(column_view const& input, thrust::equal_to{}, // key equality check thrust::minimum{}); - auto merged_max_col = cudf::make_fixed_width_column( + auto merged_max_col = cudf::make_numeric_column( data_type{type_id::FLOAT64}, num_groups, mask_state::UNALLOCATED, stream, mr); auto max_iter = thrust::make_transform_iterator( thrust::make_zip_iterator(thrust::make_tuple(tdv.max_begin(), tdv.size_begin())), @@ -798,7 +903,7 @@ std::unique_ptr group_merge_tdigest(column_view const& input, // generate cumulative weights auto merged_weights = merged->get_column(1).view(); - auto cumulative_weights = cudf::make_fixed_width_column( + auto cumulative_weights = cudf::make_numeric_column( data_type{type_id::FLOAT64}, merged_weights.size(), mask_state::UNALLOCATED); auto keys = cudf::detail::make_counting_transform_iterator( 0, @@ -846,6 +951,7 @@ std::unique_ptr group_merge_tdigest(column_view const& input, group_labels, group_offsets.data(), {tdigest_offsets.begin(), static_cast(tdigest_offsets.size())}}, + false, stream, mr); @@ -869,6 +975,7 @@ std::unique_ptr group_merge_tdigest(column_view const& input, group_cluster_wl, std::move(group_cluster_offsets), total_clusters, + false, stream, mr); } diff --git a/cpp/tests/groupby/tdigest_tests.cu b/cpp/tests/groupby/tdigest_tests.cu index 2cac931f7f5..d913d3b432c 100644 --- a/cpp/tests/groupby/tdigest_tests.cu +++ b/cpp/tests/groupby/tdigest_tests.cu @@ -358,9 +358,9 @@ struct TDigestTest : public cudf::test::BaseFixture { TEST_F(TDigestTest, EmptyMixed) { - cudf::test::fixed_width_column_wrapper values{{123456.78, 10.0, 20.0, 30.0}, - {1, 0, 1, 0}}; - cudf::test::strings_column_wrapper keys{"b", "a", "c", "d"}; + cudf::test::fixed_width_column_wrapper values{ + {123456.78, 10.0, 20.0, 25.0, 30.0, 40.0, 50.0, 60.0, 70.0}, {1, 0, 0, 1, 0, 0, 1, 1, 0}}; + cudf::test::strings_column_wrapper keys{"b", "a", "c", "c", "d", "d", "e", "e", "f"}; auto const delta = 1000; cudf::table_view t({keys}); @@ -374,7 +374,9 @@ TEST_F(TDigestTest, EmptyMixed) using FCW = cudf::test::fixed_width_column_wrapper; auto expected = make_expected_tdigest_column({{FCW{}, FCW{}, 0, 0}, {FCW{123456.78}, FCW{1.0}, 123456.78, 123456.78}, - {FCW{20.0}, FCW{1.0}, 20.0, 20.0}, + {FCW{25.0}, FCW{1.0}, 25.0, 25.0}, + {FCW{}, FCW{}, 0, 0}, + {FCW{50.0, 60.0}, FCW{1.0, 1.0}, 50.0, 60.0}, {FCW{}, FCW{}, 0, 0}}); CUDF_TEST_EXPECT_COLUMNS_EQUAL(*result.second[0].results[0], *expected); @@ -938,8 +940,10 @@ TEST_F(TDigestMergeTest, Empty) TEST_F(TDigestMergeTest, EmptyGroups) { - cudf::test::fixed_width_column_wrapper values_b{126, 15, 1, 99, 67, 55, 2}; - cudf::test::fixed_width_column_wrapper values_d{100, 200, 300, 400, 500, 600, 700}; + cudf::test::fixed_width_column_wrapper values_b{{126, 15, 1, 99, 67, 55, 2}, + {1, 0, 0, 1, 1, 1, 1}}; + cudf::test::fixed_width_column_wrapper values_d{{100, 200, 300, 400, 500, 600, 700}, + {1, 1, 1, 1, 1, 1, 0}}; cudf::test::fixed_width_column_wrapper keys{0, 0, 0, 0, 0, 0, 0}; int const delta = 1000; @@ -971,11 +975,10 @@ TEST_F(TDigestMergeTest, EmptyGroups) using FCW = cudf::test::fixed_width_column_wrapper; cudf::test::fixed_width_column_wrapper expected_means{ - 1, 2, 15, 55, 67, 99, 100, 126, 200, 300, 400, 500, 600, 700}; - cudf::test::fixed_width_column_wrapper expected_weights{ - 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1}; + 2, 55, 67, 99, 100, 126, 200, 300, 400, 500, 600}; + cudf::test::fixed_width_column_wrapper expected_weights{1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1}; auto expected = make_expected_tdigest_column( - {{expected_means, expected_weights, 1, 700}, {FCW{}, FCW{}, 0, 0}, {FCW{}, FCW{}, 0, 0}}); + {{expected_means, expected_weights, 2, 600}, {FCW{}, FCW{}, 0, 0}, {FCW{}, FCW{}, 0, 0}}); CUDF_TEST_EXPECT_COLUMNS_EQUAL(*expected, *result.second[0].results[0]); }