Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support structs column in min, max, argmin and argmax groupby aggregate() and scan() #9545

Merged
merged 53 commits into from
Nov 10, 2021
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
1d1a35b
Add condition to fallback to sort-based aggregates if the input value…
ttnghia Oct 26, 2021
f36abed
Rename function
ttnghia Oct 26, 2021
b0b4535
Implement argmin/argmax for structs
ttnghia Oct 26, 2021
6387ae2
Add comments and cleanup
ttnghia Oct 27, 2021
fddbac9
Cleanup
ttnghia Oct 27, 2021
b86665e
Simplify code
ttnghia Oct 27, 2021
96683ac
Fix null order
ttnghia Oct 28, 2021
b26cc93
Add unit tests
ttnghia Oct 28, 2021
e5d6475
Merge branch 'branch-21.12' into min_max_for_structs
ttnghia Oct 28, 2021
895cabb
Rename functor
ttnghia Oct 28, 2021
bfc0585
Move `has_struct` condition check into `can_use_has_groupby`
ttnghia Oct 28, 2021
cc5c8c4
Rename structs and function
ttnghia Oct 29, 2021
d9703c8
Merge branch 'branch-21.12' into min_max_for_structs
ttnghia Oct 29, 2021
cd7f7a4
Fix SFINAE condition, and extract a struct functor
ttnghia Nov 1, 2021
f7d1b3e
Implement groupby scan for struct min/max
ttnghia Nov 1, 2021
5d77d4f
Implement unit tests
ttnghia Nov 1, 2021
bce93e4
Rewrite SFINAE style
ttnghia Nov 1, 2021
b1b916f
Add missing `mr` parameter
ttnghia Nov 1, 2021
75e201f
Refactor `row_arg_minmax`
ttnghia Nov 2, 2021
08a60f8
Adopt "dispatch to static invoke" pattern
ttnghia Nov 2, 2021
3a0c580
Rename functors to better expressive names
ttnghia Nov 2, 2021
42e8f23
Merge branch 'branch-21.12' into min_max_for_structs
ttnghia Nov 2, 2021
f4c53c2
Fix formatting style
ttnghia Nov 2, 2021
b1a3628
Fix formatting style
ttnghia Nov 2, 2021
57858d5
Merge branch 'branch-21.12' into min_max_for_structs
ttnghia Nov 3, 2021
d868d66
Remove redundant template argument
ttnghia Nov 3, 2021
94eed99
Rewrite SFINAE into specialization
ttnghia Nov 3, 2021
cb6fb5f
Attempt to patch thrust
ttnghia Nov 4, 2021
e6885d4
Revert "Attempt to patch thrust"
ttnghia Nov 4, 2021
d4d4644
Add declaration for new internal APIs
ttnghia Nov 4, 2021
ad7998f
Call the specialized functions for struct type values
ttnghia Nov 4, 2021
0c2b0b4
Add new .cu files
ttnghia Nov 4, 2021
f063e26
Remove `struct_view` specialization
ttnghia Nov 4, 2021
9456ea3
Implement `struct_view` specialization
ttnghia Nov 4, 2021
43be509
Fix output order
ttnghia Nov 4, 2021
7cee90c
Fix EXPECT conditions
ttnghia Nov 4, 2021
534648a
Merge branch 'branch-21.12' into min_max_for_structs
ttnghia Nov 4, 2021
fee55e3
Refactor `row_operators.cuh`
ttnghia Nov 5, 2021
2fde89a
Fix function name typo
ttnghia Nov 5, 2021
ac9c603
Remove redundant header
ttnghia Nov 5, 2021
f5d27ae
Revert "Refactor `row_operators.cuh`"
ttnghia Nov 5, 2021
7a7c706
Prevent functor code from inlining
ttnghia Nov 5, 2021
2a6a106
Revert "Remove redundant header"
ttnghia Nov 5, 2021
88ef471
Revert "Remove `struct_view` specialization"
ttnghia Nov 5, 2021
494af01
Revert "Add new .cu files"
ttnghia Nov 5, 2021
3833bbc
Revert "Call the specialized functions for struct type values"
ttnghia Nov 5, 2021
61c774a
Revert "Add declaration for new internal APIs"
ttnghia Nov 5, 2021
a5ab52d
Fix function name
ttnghia Nov 5, 2021
731426a
Fix CMakeList.txt
ttnghia Nov 5, 2021
49259f3
Remove files
ttnghia Nov 5, 2021
cbf386f
Add groupby struct benchmark
ttnghia Nov 8, 2021
8b3f72e
Implement benchmark
ttnghia Nov 8, 2021
cdfc602
Unify 2 functions into a template function
ttnghia Nov 8, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion cpp/src/groupby/groupby.cu
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,21 @@ std::pair<std::unique_ptr<table>, std::vector<aggregation_result>> groupby::disp
{
using namespace cudf::structs::detail;

// Currently, structs are not supported in hash-based aggregates.
// Therefore, if any request contains structs then we must fallback to sort-based aggregates.
// TODO: Support structs in hash-based aggregates.
auto const has_struct =
std::all_of(requests.begin(), requests.end(), [](aggregation_request const& r) {
return r.values.type().id() == type_id::STRUCT;
});

ttnghia marked this conversation as resolved.
Show resolved Hide resolved
// If sort groupby has been called once on this groupby object, then
// always use sort groupby from now on. Because once keys are sorted,
// all the aggs that can be done by hash groupby are efficiently done by
// sort groupby as well.
// Only use hash groupby if the keys aren't sorted and all requests can be
// satisfied with a hash implementation
if (_keys_are_sorted == sorted::NO and not _helper and
if (_keys_are_sorted == sorted::NO and not _helper and (not has_struct) and
detail::hash::can_use_hash_groupby(_keys, requests)) {
// Optionally flatten nested key columns.
auto flattened = flatten_nested_columns(_keys, {}, {}, column_nullability::FORCE);
Expand Down
195 changes: 152 additions & 43 deletions cpp/src/groupby/sort/group_single_pass_reduction_util.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@
#include <cudf/column/column_view.hpp>
#include <cudf/detail/aggregation/aggregation.cuh>
#include <cudf/detail/iterator.cuh>
#include <cudf/detail/structs/utilities.hpp>
#include <cudf/detail/utilities/vector_factories.hpp>
#include <cudf/detail/valid_if.cuh>
#include <cudf/table/row_operators.cuh>
#include <cudf/types.hpp>
#include <cudf/utilities/span.hpp>

Expand All @@ -38,40 +41,66 @@ namespace groupby {
namespace detail {

/**
* @brief ArgMin binary operator with index values into input column.
* @brief Binary operator with index values into the input column.
*
* @tparam T Type of the underlying column. Must support '<' operator.
*/
template <typename T>
struct ArgMin {
template <typename T, bool has_nulls, bool arg_min, typename Enable = void>
struct arg_minmax_fn {
column_device_view const d_col;
CUDA_DEVICE_CALLABLE auto operator()(size_type const& lhs, size_type const& rhs) const
CUDA_DEVICE_CALLABLE auto operator()(size_type const& lhs_idx, size_type const& rhs_idx) const
{
// The extra bounds checking is due to issue github.com/rapidsai/cudf/9156 and
// github.com/NVIDIA/thrust/issues/1525
// where invalid random values may be passed here by thrust::reduce_by_key
if (lhs < 0 || lhs >= d_col.size() || d_col.is_null(lhs)) { return rhs; }
if (rhs < 0 || rhs >= d_col.size() || d_col.is_null(rhs)) { return lhs; }
return d_col.element<T>(lhs) < d_col.element<T>(rhs) ? lhs : rhs;
if (lhs_idx < 0 || lhs_idx >= d_col.size() || (has_nulls && d_col.is_null_nocheck(lhs_idx))) {
return rhs_idx;
}
if (rhs_idx < 0 || rhs_idx >= d_col.size() || (has_nulls && d_col.is_null_nocheck(rhs_idx))) {
return lhs_idx;
}

// Return `lhs_idx` iff:
// row(lhs_idx) < row(rhs_idx) and finding ArgMin, or
// row(lhs_idx) >= row(rhs_idx) and finding ArgMax.
auto const less = d_col.element<T>(lhs_idx) < d_col.element<T>(rhs_idx);
return less == arg_min ? lhs_idx : rhs_idx;
}
};

/**
* @brief ArgMax binary operator with index values into input column.
* @brief Binary operator ArgMin/ArgMax with index values into the input table.
*
* @tparam T Type of the underlying column. Must support '<' operator.
* @tparam T Type of the underlying data. This is the fallback for the cases when T does not support
* '<' operator.
*/
template <typename T>
struct ArgMax {
column_device_view const d_col;
CUDA_DEVICE_CALLABLE auto operator()(size_type const& lhs, size_type const& rhs) const
template <typename T, bool has_nulls, bool arg_min>
struct arg_minmax_fn<T,
has_nulls,
arg_min,
std::enable_if_t<!cudf::is_relationally_comparable<T, T>()>> {
size_type const num_rows;
row_lexicographic_comparator<has_nulls> const comp;

arg_minmax_fn(size_type const num_rows_,
table_device_view const& table_,
null_order const* null_precedence)
: num_rows(num_rows_), comp(table_, table_, nullptr, null_precedence)
{
}
ttnghia marked this conversation as resolved.
Show resolved Hide resolved

CUDA_DEVICE_CALLABLE auto operator()(size_type lhs_idx, size_type rhs_idx) const
{
// The extra bounds checking is due to issue github.com/rapidsai/cudf/9156 and
// github.com/NVIDIA/thrust/issues/1525
// where invalid random values may be passed here by thrust::reduce_by_key
if (lhs < 0 || lhs >= d_col.size() || d_col.is_null(lhs)) { return rhs; }
if (rhs < 0 || rhs >= d_col.size() || d_col.is_null(rhs)) { return lhs; }
return d_col.element<T>(rhs) < d_col.element<T>(lhs) ? lhs : rhs;
if (lhs_idx < 0 || lhs_idx >= num_rows) { return rhs_idx; }
if (rhs_idx < 0 || rhs_idx >= num_rows) { return lhs_idx; }

// Return `lhs_idx` iff:
// row(lhs_idx) < row(rhs_idx) and finding ArgMin, or
// row(lhs_idx) >= row(rhs_idx) and finding ArgMax.
return comp(lhs_idx, rhs_idx) == arg_min ? lhs_idx : rhs_idx;
}
};

Expand Down Expand Up @@ -124,7 +153,7 @@ struct null_replaced_value_accessor : value_accessor<T> {
template <aggregation::Kind K>
struct reduce_functor {
template <typename T>
static constexpr bool is_supported()
static constexpr bool is_natively_supported()
ttnghia marked this conversation as resolved.
Show resolved Hide resolved
{
switch (K) {
case aggregation::SUM:
Expand All @@ -140,15 +169,14 @@ struct reduce_functor {
}

template <typename T>
std::enable_if_t<is_supported<T>(), std::unique_ptr<column>> operator()(
std::enable_if_t<is_natively_supported<T>(), std::unique_ptr<column>> operator()(
column_view const& values,
size_type num_groups,
cudf::device_span<size_type const> group_labels,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
using DeviceType = device_storage_type_t<T>;
using OpType = cudf::detail::corresponding_operator_t<K>;
using ResultType = cudf::detail::target_type_t<T, K>;
using ResultDType = device_storage_type_t<ResultType>;

Expand All @@ -161,53 +189,134 @@ struct reduce_functor {

if (values.is_empty()) { return result; }

auto resultview = mutable_column_device_view::create(result->mutable_view(), stream);
auto valuesview = column_device_view::create(values, stream);
if constexpr (K == aggregation::ARGMAX || K == aggregation::ARGMIN) {
using OpType =
std::conditional_t<(K == aggregation::ARGMAX), ArgMax<DeviceType>, ArgMin<DeviceType>>;
// Perform segmented reduction.
auto const do_reduction = [&](auto const& inp_iter, auto const& out_iter, auto const& binop) {
thrust::reduce_by_key(rmm::exec_policy(stream),
group_labels.data(),
group_labels.data() + group_labels.size(),
thrust::make_counting_iterator<ResultType>(0),
inp_iter,
thrust::make_discard_iterator(),
resultview->begin<ResultType>(),
thrust::equal_to<ResultType>{},
OpType{*valuesview});
} else {
auto init = OpType::template identity<DeviceType>();
auto begin = cudf::detail::make_counting_transform_iterator(
0, null_replaced_value_accessor{*valuesview, init, values.has_nulls()});
thrust::reduce_by_key(rmm::exec_policy(stream),
group_labels.data(),
group_labels.data() + group_labels.size(),
begin,
thrust::make_discard_iterator(),
resultview->begin<ResultDType>(),
out_iter,
thrust::equal_to<size_type>{},
OpType{});
binop);
};

auto const d_values_ptr = column_device_view::create(values, stream);
auto const result_begin = result->mutable_view().template begin<ResultDType>();

if constexpr (K == aggregation::ARGMAX || K == aggregation::ARGMIN) {
auto const count_iter = thrust::make_counting_iterator<ResultType>(0);
if (values.has_nulls()) {
using OpType = arg_minmax_fn<T, true, K == aggregation::ARGMIN>;
do_reduction(count_iter, result_begin, OpType{*d_values_ptr});
} else {
using OpType = arg_minmax_fn<T, false, K == aggregation::ARGMIN>;
do_reduction(count_iter, result_begin, OpType{*d_values_ptr});
}
} else {
using OpType = cudf::detail::corresponding_operator_t<K>;
auto init = OpType::template identity<DeviceType>();
auto begin = cudf::detail::make_counting_transform_iterator(
0, null_replaced_value_accessor{*d_values_ptr, init, values.has_nulls()});
do_reduction(begin, result_begin, OpType{});
}

if (values.has_nulls()) {
rmm::device_uvector<bool> validity(num_groups, stream);
do_reduction(cudf::detail::make_validity_iterator(*d_values_ptr),
validity.begin(),
thrust::logical_or<bool>{});
ttnghia marked this conversation as resolved.
Show resolved Hide resolved

auto [null_mask, null_count] = cudf::detail::valid_if(
validity.begin(), validity.end(), thrust::identity<bool>{}, stream, mr);
result->set_null_mask(std::move(null_mask));
result->set_null_count(null_count);
ttnghia marked this conversation as resolved.
Show resolved Hide resolved
}
return result;
}

// This specialization handles the cases when the input values type:
// - Is not natively supported, and
// - Is struct type, and
// - Aggregation is either ARGMIN or ARGMAX.
template <typename T>
std::enable_if_t<not is_natively_supported<T>() and std::is_same_v<T, struct_view> and
ttnghia marked this conversation as resolved.
Show resolved Hide resolved
(K == aggregation::ARGMIN or K == aggregation::ARGMAX),
std::unique_ptr<column>>
operator()(column_view const& values,
size_type num_groups,
cudf::device_span<size_type const> group_labels,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
// This is be expected to be size_type.
using ResultType = cudf::detail::target_type_t<T, K>;

auto result = make_fixed_width_column(
data_type{type_to_id<ResultType>()}, num_groups, mask_state::UNALLOCATED, stream, mr);

if (values.is_empty()) { return result; }

// When finding ARGMIN, we need to consider nulls as larger than non-null elements.
// Thing is opposite for ARGMAX.
auto const null_precedence =
(K == aggregation::ARGMIN) ? null_order::AFTER : null_order::BEFORE;
auto const flattened_values = structs::detail::flatten_nested_columns(
table_view{{values}}, {}, std::vector<null_order>{null_precedence});
auto const d_flattened_values_ptr = table_device_view::create(flattened_values, stream);
auto const flattened_null_precedences =
(K == aggregation::ARGMIN)
? cudf::detail::make_device_uvector_async(flattened_values.null_orders(), stream)
: rmm::device_uvector<null_order>(0, stream);

// Perform segmented reduction to find ARGMIN/ARGMAX.
auto const do_reduction = [&](auto const& inp_iter, auto const& out_iter, auto const& binop) {
thrust::reduce_by_key(rmm::exec_policy(stream),
group_labels.data(),
group_labels.data() + group_labels.size(),
cudf::detail::make_validity_iterator(*valuesview),
inp_iter,
thrust::make_discard_iterator(),
validity.begin(),
out_iter,
thrust::equal_to<size_type>{},
thrust::logical_or<bool>{});
binop);
};

auto const count_iter = thrust::make_counting_iterator<ResultType>(0);
auto const result_begin = result->mutable_view().template begin<ResultType>();
if (values.has_nulls()) {
auto const op = arg_minmax_fn<T, true, K == aggregation::ARGMIN>(
values.size(), *d_flattened_values_ptr, flattened_null_precedences.data());
do_reduction(count_iter, result_begin, op);

// Generate bitmask for the output by segmented reduction of the input bitmask.
auto const d_values_ptr = column_device_view::create(values, stream);
auto validity = rmm::device_uvector<bool>(num_groups, stream);
do_reduction(cudf::detail::make_validity_iterator(*d_values_ptr),
validity.begin(),
thrust::logical_or<bool>{});

auto [null_mask, null_count] = cudf::detail::valid_if(
validity.begin(), validity.end(), thrust::identity<bool>{}, stream, mr);
result->set_null_mask(std::move(null_mask));
result->set_null_count(null_count);
} else {
auto const op = arg_minmax_fn<T, false, K == aggregation::ARGMIN>(
values.size(), *d_flattened_values_ptr, flattened_null_precedences.data());
do_reduction(count_iter, result_begin, op);
}

return result;
}

// Throw exception if the input values type:
// - Is not natively supported, and
// - Is not struct type, or is struct type but aggregation is not neither ARGMIN nor ARGMAX.
template <typename T, typename... Args>
std::enable_if_t<not is_supported<T>(), std::unique_ptr<column>> operator()(Args&&... args)
std::enable_if_t<not is_natively_supported<T>() and
(not std::is_same_v<T, struct_view> or
(K != aggregation::ARGMIN or K != aggregation::ARGMAX)),
std::unique_ptr<column>>
operator()(Args&&... args)
{
CUDF_FAIL("Unsupported type-agg combination");
}
Expand Down
74 changes: 73 additions & 1 deletion cpp/tests/groupby/argmax_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ struct groupby_argmax_test : public cudf::test::BaseFixture {
};
using K = int32_t;

TYPED_TEST_CASE(groupby_argmax_test, cudf::test::FixedWidthTypes);
TYPED_TEST_SUITE(groupby_argmax_test, cudf::test::FixedWidthTypes);

TYPED_TEST(groupby_argmax_test, basic)
{
Expand Down Expand Up @@ -182,6 +182,78 @@ TEST_F(groupby_dictionary_argmax_test, basic)
force_use_sort_impl::YES);
}

struct groupby_argmax_struct_test : public cudf::test::BaseFixture {
};

TEST_F(groupby_argmax_struct_test, basic)
{
auto const keys = fixed_width_column_wrapper<int32_t>{1, 2, 3, 1, 2, 2, 1, 3, 3, 2};
auto const vals = [] {
auto child1 =
strings_column_wrapper{"año", "bit", "₹1", "aaa", "zit", "bat", "aab", "$1", "€1", "wut"};
auto child2 = fixed_width_column_wrapper<int32_t>{1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
return structs_column_wrapper{{child1, child2}};
}();

auto const expect_keys = fixed_width_column_wrapper<int32_t>{1, 2, 3};
auto const expect_indices = fixed_width_column_wrapper<int32_t>{0, 4, 2};

auto agg = cudf::make_argmax_aggregation<groupby_aggregation>();
test_single_agg(keys, vals, expect_keys, expect_indices, std::move(agg));
}

TEST_F(groupby_argmax_struct_test, slice_input)
{
constexpr int32_t dont_care{1};
auto const keys_original = fixed_width_column_wrapper<int32_t>{
dont_care, dont_care, 1, 2, 3, 1, 2, 2, 1, 3, 3, 2, dont_care};
auto const vals_original = [] {
auto child1 = strings_column_wrapper{"dont_care",
"dont_care",
"año",
"bit",
"₹1",
"aaa",
"zit",
"bat",
"aab",
"$1",
"€1",
"wut",
"dont_care"};
auto child2 = fixed_width_column_wrapper<int32_t>{
dont_care, dont_care, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, dont_care};
return structs_column_wrapper{{child1, child2}};
}();

auto const keys = cudf::slice(keys_original, {2, 12})[0];
auto const vals = cudf::slice(vals_original, {2, 12})[0];
auto const expect_keys = fixed_width_column_wrapper<int32_t>{1, 2, 3};
auto const expect_indices = fixed_width_column_wrapper<int32_t>{0, 4, 2};

auto agg = cudf::make_argmax_aggregation<groupby_aggregation>();
test_single_agg(keys, vals, expect_keys, expect_indices, std::move(agg));
}

TEST_F(groupby_argmax_struct_test, null_keys_and_values)
{
constexpr int32_t null{0};
auto const keys =
fixed_width_column_wrapper<int32_t>{{1, 2, 3, 1, 2, 2, 1, null, 3, 2, 4}, null_at(7)};
auto const vals = [] {
auto child1 = strings_column_wrapper{
"año", "bit", "₹1", "aaa", "zit", "" /*NULL*/, "" /*NULL*/, "$1", "€1", "wut", "" /*NULL*/};
auto child2 = fixed_width_column_wrapper<int32_t>{9, 8, 7, 6, 5, null, null, 2, 1, 0, null};
return structs_column_wrapper{{child1, child2}, nulls_at({5, 6, 10})};
}();

auto const expect_keys = fixed_width_column_wrapper<int32_t>{{1, 2, 3, 4}, no_nulls()};
auto const expect_indices = fixed_width_column_wrapper<int32_t>{{0, 4, 2, null}, null_at(3)};

auto agg = cudf::make_argmax_aggregation<groupby_aggregation>();
test_single_agg(keys, vals, expect_keys, expect_indices, std::move(agg));
}

} // namespace test
} // namespace cudf

Expand Down
Loading