Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix edge case in tdigest scalar generation for groups containing all nulls. #9551

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 129 additions & 20 deletions cpp/src/groupby/sort/group_tdigest.cu
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ struct make_centroid {

centroid operator() __device__(size_type index)
{
return {static_cast<double>(col.element<T>(index)), 1, col.is_valid(index)};
return {
static_cast<double>(col.element<T>(index)), col.is_valid(index) ? 1 : 0, col.is_valid(index)};
nvdbaranec marked this conversation as resolved.
Show resolved Hide resolved
}
};

Expand Down Expand Up @@ -237,6 +238,7 @@ __device__ double scale_func_k1(double quantile, double delta_norm)
* @param group_cluster_wl Output. The set of cluster weight limits for each group.
* @param group_num_clusters Output. The number of output clusters for each input group.
* @param group_cluster_offsets Offsets per-group to the start of it's clusters
* @param has_nulls Whether or not the input contains nulls
*
*/
template <typename TotalWeightIter, typename NearestWeightFunc, typename CumulativeWeight>
Expand All @@ -247,24 +249,33 @@ __global__ void generate_cluster_limits_kernel(int delta_,
CumulativeWeight cumulative_weight,
double* group_cluster_wl,
size_type* group_num_clusters,
offset_type const* group_cluster_offsets)
offset_type const* group_cluster_offsets,
bool has_nulls)
{
int const tid = threadIdx.x + blockIdx.x * blockDim.x;
auto const group_index = tid;
if (group_index >= num_groups) { return; }

// we will generate at most delta clusters.
double const delta = static_cast<double>(delta_);
double const delta_norm = delta / (2.0 * M_PI);
double const total_weight = total_weight_[group_index];
group_num_clusters[group_index] = 0;
// a group with nothing in it.
if (total_weight <= 0) { return; }
double const delta = static_cast<double>(delta_);
double const delta_norm = delta / (2.0 * M_PI);
double const total_weight = total_weight_[group_index];

// start at the correct place based on our cluster offset.
double* cluster_wl =
group_cluster_wl ? group_cluster_wl + group_cluster_offsets[group_index] : nullptr;

// a group with nothing in it.
group_num_clusters[group_index] = 0;
if (total_weight <= 0) {
// if the input contains nulls we can potentially have a group that generates no
// clusters because -all- of the input values are null. in that case, the reduce_by_key call
// in the tdigest generation step will need a location to store the unused reduction value for
// that group of nulls. these "stubs" will be postprocessed out afterwards.
if (has_nulls) { group_num_clusters[group_index] = 1; }
return;
}

double cur_limit = 0.0;
double cur_weight = 0.0;
double next_limit = -1.0;
Expand Down Expand Up @@ -349,6 +360,7 @@ __global__ void generate_cluster_limits_kernel(int delta_,
* stream that falls before our current cluster limit
* @param total_weight A functor which returns the expected total weight for
* the entire stream of input values for the specified group.
* @param has_nulls Whether or not the input data contains nulls
* @param stream CUDA stream used for device memory operations and kernel launches.
* @param mr Device memory resource used to allocate the returned column's device memory
*
Expand All @@ -362,6 +374,7 @@ generate_group_cluster_info(int delta,
NearestWeight nearest_weight,
TotalWeightIter total_weight,
CumulativeWeight cumulative_weight,
bool has_nulls,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
Expand All @@ -379,7 +392,8 @@ generate_group_cluster_info(int delta,
cumulative_weight,
nullptr,
group_num_clusters.begin(),
nullptr);
nullptr,
has_nulls);

// generate group cluster offsets (where the clusters for a given group start and end)
auto group_cluster_offsets = cudf::make_fixed_width_column(
Expand Down Expand Up @@ -408,13 +422,100 @@ generate_group_cluster_info(int delta,
cumulative_weight,
group_cluster_wl.begin(),
group_num_clusters.begin(),
group_cluster_offsets->view().begin<offset_type>());
group_cluster_offsets->view().begin<offset_type>(),
has_nulls);

return {std::move(group_cluster_wl),
std::move(group_cluster_offsets),
static_cast<size_type>(total_clusters)};
}

std::unique_ptr<column> build_output_column(size_type num_rows,
std::unique_ptr<column>&& means,
std::unique_ptr<column>&& weights,
std::unique_ptr<column>&& offsets,
std::unique_ptr<column>&& min_col,
std::unique_ptr<column>&& max_col,
bool has_nulls,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
// whether or not this weight is a stub
auto is_stub_weight = [weights = weights->view().begin<double>()] __device__(size_type i) {
return weights[i] == 0;
};
// whether or not this particular tdigest is a stub
auto is_stub_digest = [offsets = offsets->view().begin<offset_type>(), is_stub_weight] __device__(
size_type i) { return is_stub_weight(offsets[i]) ? 1 : 0; };
mythrocks marked this conversation as resolved.
Show resolved Hide resolved

size_type const num_stubs = [&]() {
if (!has_nulls) { return 0; }
auto iter = cudf::detail::make_counting_transform_iterator(0, is_stub_digest);
return thrust::reduce(rmm::exec_policy(stream), iter, iter + num_rows);
}();

// if there are no stub tdigests, we can return immediately.
if (num_stubs == 0) {
return cudf::detail::tdigest::make_tdigest_column(num_rows,
std::move(means),
std::move(weights),
std::move(offsets),
std::move(min_col),
std::move(max_col),
stream,
mr);
}

// otherwise we need to strip out the stubs.

// remove from the means and weights column
auto _means = cudf::make_fixed_width_column(
nvdbaranec marked this conversation as resolved.
Show resolved Hide resolved
data_type{type_id::FLOAT64}, means->size() - num_stubs, mask_state::UNALLOCATED, stream, mr);
thrust::remove_copy_if(rmm::exec_policy(stream),
means->view().begin<double>(),
means->view().end<double>(),
thrust::make_counting_iterator(0),
_means->mutable_view().begin<double>(),
is_stub_weight);
nvdbaranec marked this conversation as resolved.
Show resolved Hide resolved
auto _weights = cudf::make_fixed_width_column(
nvdbaranec marked this conversation as resolved.
Show resolved Hide resolved
data_type{type_id::FLOAT64}, weights->size() - num_stubs, mask_state::UNALLOCATED, stream, mr);
thrust::remove_copy_if(rmm::exec_policy(stream),
weights->view().begin<double>(),
weights->view().end<double>(),
thrust::make_counting_iterator(0),
_weights->mutable_view().begin<double>(),
is_stub_weight);

// adjust offsets.
rmm::device_uvector<offset_type> sizes(num_rows, stream);
thrust::transform(rmm::exec_policy(stream),
thrust::make_counting_iterator(0),
thrust::make_counting_iterator(0) + num_rows,
sizes.begin(),
[offsets = offsets->view().begin<offset_type>()] __device__(size_type i) {
return offsets[i + 1] - offsets[i];
});
auto iter = cudf::detail::make_counting_transform_iterator(
0, [sizes = sizes.begin(), is_stub_digest, num_rows] __device__(size_type i) {
return i == num_rows || is_stub_digest(i) ? 0 : sizes[i];
});
thrust::exclusive_scan(rmm::exec_policy(stream),
iter,
iter + num_rows + 1,
offsets->mutable_view().begin<offset_type>(),
0);

// assemble final column
return cudf::detail::tdigest::make_tdigest_column(num_rows,
std::move(_means),
std::move(_weights),
std::move(offsets),
std::move(min_col),
std::move(max_col),
stream,
mr);
}

/**
* @brief Compute a column of tdigests.
*
Expand All @@ -436,6 +537,7 @@ generate_group_cluster_info(int delta,
* @param group_cluster_wl Cluster weight limits for each group.
* @param group_cluster_offsets R-value reference of offsets into the cluster weight limits.
* @param total_clusters Total number of clusters in all groups.
* @param has_nulls Whether or not the input contains nulls
* @param stream CUDA stream used for device memory operations and kernel launches.
* @param mr Device memory resource used to allocate the returned column's device memory
*
Expand All @@ -451,10 +553,11 @@ std::unique_ptr<column> compute_tdigests(int delta,
rmm::device_uvector<double> const& group_cluster_wl,
std::unique_ptr<column>&& group_cluster_offsets,
size_type total_clusters,
bool has_nulls,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
// the output for each group is column of data that represents the tdigest. since we want 1 row
// the output for each group is a column of data that represents the tdigest. since we want 1 row
// per group, each row will be a list the length of the tdigest for that group. so our output
// column is of the form:
// struct {
Expand Down Expand Up @@ -513,6 +616,8 @@ std::unique_ptr<column> compute_tdigests(int delta,
// reduce the centroids down by key.
cudf::mutable_column_view mean_col(*centroid_means);
cudf::mutable_column_view weight_col(*centroid_weights);

// reduce the centroids into the clusters
auto output = thrust::make_zip_iterator(thrust::make_tuple(
mean_col.begin<double>(), weight_col.begin<double>(), thrust::make_discard_iterator()));
auto const num_values = std::distance(centroids_begin, centroids_end);
Expand All @@ -526,15 +631,15 @@ std::unique_ptr<column> compute_tdigests(int delta,
merge_centroids{});

// create final tdigest column
auto const num_output_rows = group_cluster_offsets->size() - 1;
return cudf::detail::tdigest::make_tdigest_column(num_output_rows,
std::move(centroid_means),
std::move(centroid_weights),
std::move(group_cluster_offsets),
std::move(min_col),
std::move(max_col),
stream,
mr);
return build_output_column(group_cluster_offsets->size() - 1,
std::move(centroid_means),
std::move(centroid_weights),
std::move(group_cluster_offsets),
std::move(min_col),
std::move(max_col),
has_nulls,
stream,
mr);
}

// retrieve total weight of scalar inputs by group index
Expand Down Expand Up @@ -583,6 +688,7 @@ struct typed_group_tdigest {
nearest_value_scalar_weights{},
total_weight,
cumulative_scalar_weight{group_offsets, group_labels},
col.null_count() > 0,
nvdbaranec marked this conversation as resolved.
Show resolved Hide resolved
stream,
mr);

Expand Down Expand Up @@ -617,6 +723,7 @@ struct typed_group_tdigest {
group_cluster_wl,
std::move(group_cluster_offsets),
total_clusters,
col.null_count() > 0,
stream,
mr);
}
Expand Down Expand Up @@ -846,6 +953,7 @@ std::unique_ptr<column> group_merge_tdigest(column_view const& input,
group_labels,
group_offsets.data(),
{tdigest_offsets.begin<offset_type>(), static_cast<size_t>(tdigest_offsets.size())}},
false,
nvdbaranec marked this conversation as resolved.
Show resolved Hide resolved
stream,
mr);

Expand All @@ -869,6 +977,7 @@ std::unique_ptr<column> group_merge_tdigest(column_view const& input,
group_cluster_wl,
std::move(group_cluster_offsets),
total_clusters,
false,
stream,
mr);
}
Expand Down
23 changes: 13 additions & 10 deletions cpp/tests/groupby/tdigest_tests.cu
Original file line number Diff line number Diff line change
Expand Up @@ -358,9 +358,9 @@ struct TDigestTest : public cudf::test::BaseFixture {

TEST_F(TDigestTest, EmptyMixed)
{
cudf::test::fixed_width_column_wrapper<double> values{{123456.78, 10.0, 20.0, 30.0},
{1, 0, 1, 0}};
cudf::test::strings_column_wrapper keys{"b", "a", "c", "d"};
cudf::test::fixed_width_column_wrapper<double> values{
{123456.78, 10.0, 20.0, 25.0, 30.0, 40.0, 50.0, 60.0, 70.0}, {1, 0, 0, 1, 0, 0, 1, 1, 0}};
cudf::test::strings_column_wrapper keys{"b", "a", "c", "c", "d", "d", "e", "e", "f"};

auto const delta = 1000;
cudf::table_view t({keys});
Expand All @@ -374,7 +374,9 @@ TEST_F(TDigestTest, EmptyMixed)
using FCW = cudf::test::fixed_width_column_wrapper<double>;
auto expected = make_expected_tdigest_column({{FCW{}, FCW{}, 0, 0},
{FCW{123456.78}, FCW{1.0}, 123456.78, 123456.78},
{FCW{20.0}, FCW{1.0}, 20.0, 20.0},
{FCW{25.0}, FCW{1.0}, 25.0, 25.0},
{FCW{}, FCW{}, 0, 0},
{FCW{50.0, 60.0}, FCW{1.0, 1.0}, 50.0, 60.0},
{FCW{}, FCW{}, 0, 0}});

CUDF_TEST_EXPECT_COLUMNS_EQUAL(*result.second[0].results[0], *expected);
Expand Down Expand Up @@ -938,8 +940,10 @@ TEST_F(TDigestMergeTest, Empty)

TEST_F(TDigestMergeTest, EmptyGroups)
{
cudf::test::fixed_width_column_wrapper<double> values_b{126, 15, 1, 99, 67, 55, 2};
cudf::test::fixed_width_column_wrapper<double> values_d{100, 200, 300, 400, 500, 600, 700};
cudf::test::fixed_width_column_wrapper<double> values_b{{126, 15, 1, 99, 67, 55, 2},
{1, 0, 0, 1, 1, 1, 1}};
cudf::test::fixed_width_column_wrapper<double> values_d{{100, 200, 300, 400, 500, 600, 700},
{1, 1, 1, 1, 1, 1, 0}};
cudf::test::fixed_width_column_wrapper<int> keys{0, 0, 0, 0, 0, 0, 0};
int const delta = 1000;

Expand Down Expand Up @@ -971,11 +975,10 @@ TEST_F(TDigestMergeTest, EmptyGroups)

using FCW = cudf::test::fixed_width_column_wrapper<double>;
cudf::test::fixed_width_column_wrapper<double> expected_means{
1, 2, 15, 55, 67, 99, 100, 126, 200, 300, 400, 500, 600, 700};
cudf::test::fixed_width_column_wrapper<double> expected_weights{
1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1};
2, 55, 67, 99, 100, 126, 200, 300, 400, 500, 600};
cudf::test::fixed_width_column_wrapper<double> expected_weights{1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1};
auto expected = make_expected_tdigest_column(
{{expected_means, expected_weights, 1, 700}, {FCW{}, FCW{}, 0, 0}, {FCW{}, FCW{}, 0, 0}});
{{expected_means, expected_weights, 2, 600}, {FCW{}, FCW{}, 0, 0}, {FCW{}, FCW{}, 0, 0}});

CUDF_TEST_EXPECT_COLUMNS_EQUAL(*expected, *result.second[0].results[0]);
}
Expand Down