Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move rank scan implementations from scan_inclusive.cu to rank_scan.cu #9351

Merged
merged 3 commits into from
Oct 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,7 @@ add_library(cudf
src/reductions/nth_element.cu
src/reductions/product.cu
src/reductions/reductions.cpp
src/reductions/scan/rank_scan.cu
src/reductions/scan/scan.cpp
src/reductions/scan/scan_exclusive.cu
src/reductions/scan/scan_inclusive.cu
Expand Down
24 changes: 24 additions & 0 deletions cpp/include/cudf/detail/scan.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,5 +75,29 @@ std::unique_ptr<column> scan_inclusive(column_view const& input,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr);

/**
* @brief Generate row ranks for a column
*
* @param order_by Input column to generate ranks for
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource used to allocate the returned column's device memory
* @return rank values
*/
std::unique_ptr<column> inclusive_rank_scan(column_view const& order_by,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr);

/**
* @brief Generate row dense ranks for a column
*
* @param order_by Input column to generate ranks for
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource used to allocate the returned column's device memory
* @return rank values
*/
std::unique_ptr<column> inclusive_dense_rank_scan(column_view const& order_by,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr);

} // namespace detail
} // namespace cudf
130 changes: 130 additions & 0 deletions cpp/src/reductions/scan/rank_scan.cu
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* Copyright (c) 2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <structs/utilities.hpp>

#include <cudf/column/column_device_view.cuh>
#include <cudf/column/column_factories.hpp>
#include <cudf/detail/utilities/device_operators.cuh>
#include <cudf/table/row_operators.cuh>

#include <rmm/cuda_stream_view.hpp>
#include <rmm/exec_policy.hpp>

#include <thrust/scan.h>
#include <thrust/tabulate.h>

namespace cudf {
namespace detail {
namespace {

/**
* @brief generate row ranks or dense ranks using a row comparison then scan the results
*
* @tparam has_nulls if the order_by column has nulls
* @tparam value_resolver flag value resolver with boolean first and row number arguments
* @tparam scan_operator scan function ran on the flag values
* @param order_by input column to generate ranks for
* @param resolver flag value resolver
* @param scan_op scan operation ran on the flag results
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource used to allocate the returned column's device memory
* @return std::unique_ptr<column> rank values
*/
template <bool has_nulls, typename value_resolver, typename scan_operator>
std::unique_ptr<column> rank_generator(column_view const& order_by,
value_resolver resolver,
scan_operator scan_op,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
auto const superimposed = structs::detail::superimpose_parent_nulls(order_by, stream, mr);
table_view const order_table{{std::get<0>(superimposed)}};
auto const flattener = cudf::structs::detail::flatten_nested_columns(
order_table, {}, {}, structs::detail::column_nullability::MATCH_INCOMING);
auto const d_flat_order = table_device_view::create(std::get<0>(flattener), stream);
row_equality_comparator<has_nulls> comparator(*d_flat_order, *d_flat_order, true);
auto ranks = make_fixed_width_column(data_type{type_to_id<size_type>()},
order_table.num_rows(),
mask_state::UNALLOCATED,
stream,
mr);
auto mutable_ranks = ranks->mutable_view();

thrust::tabulate(rmm::exec_policy(stream),
mutable_ranks.begin<size_type>(),
mutable_ranks.end<size_type>(),
[comparator, resolver] __device__(size_type row_index) {
return resolver(row_index == 0 || !comparator(row_index, row_index - 1),
row_index);
});

thrust::inclusive_scan(rmm::exec_policy(stream),
mutable_ranks.begin<size_type>(),
mutable_ranks.end<size_type>(),
mutable_ranks.begin<size_type>(),
scan_op);
return ranks;
}

} // namespace

std::unique_ptr<column> inclusive_dense_rank_scan(column_view const& order_by,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
CUDF_EXPECTS(!cudf::structs::detail::is_or_has_nested_lists(order_by),
"Unsupported list type in dense_rank scan.");
if (has_nested_nulls(table_view{{order_by}})) {
return rank_generator<true>(
order_by,
[] __device__(bool equality, auto row_index) { return equality; },
DeviceSum{},
stream,
mr);
}
return rank_generator<false>(
order_by,
[] __device__(bool equality, auto row_index) { return equality; },
DeviceSum{},
stream,
mr);
}

std::unique_ptr<column> inclusive_rank_scan(column_view const& order_by,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
CUDF_EXPECTS(!cudf::structs::detail::is_or_has_nested_lists(order_by),
"Unsupported list type in rank scan.");
if (has_nested_nulls(table_view{{order_by}})) {
return rank_generator<true>(
order_by,
[] __device__(bool equality, auto row_index) { return equality ? row_index + 1 : 0; },
DeviceMax{},
stream,
mr);
}
return rank_generator<false>(
order_by,
[] __device__(bool equality, auto row_index) { return equality ? row_index + 1 : 0; },
DeviceMax{},
stream,
mr);
}

} // namespace detail
} // namespace cudf
12 changes: 12 additions & 0 deletions cpp/src/reductions/scan/scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,18 @@ std::unique_ptr<column> scan(column_view const& input,
rmm::mr::device_memory_resource* mr)
{
CUDF_FUNC_RANGE();

if (agg->kind == aggregation::RANK) {
CUDF_EXPECTS(inclusive == scan_type::INCLUSIVE,
"Unsupported rank aggregation operator for exclusive scan");
return inclusive_rank_scan(input, rmm::cuda_stream_default, mr);
}
if (agg->kind == aggregation::DENSE_RANK) {
CUDF_EXPECTS(inclusive == scan_type::INCLUSIVE,
"Unsupported dense rank aggregation operator for exclusive scan");
return inclusive_dense_rank_scan(input, rmm::cuda_stream_default, mr);
}

return inclusive == scan_type::EXCLUSIVE
? detail::scan_exclusive(input, agg, null_handling, rmm::cuda_stream_default, mr)
: detail::scan_inclusive(input, agg, null_handling, rmm::cuda_stream_default, mr);
Expand Down
18 changes: 3 additions & 15 deletions cpp/src/reductions/scan/scan.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -33,26 +33,16 @@ rmm::device_buffer mask_scan(column_view const& input_view,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr);

std::unique_ptr<column> inclusive_rank_scan(column_view const& order_by,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr);

std::unique_ptr<column> inclusive_dense_rank_scan(column_view const& order_by,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr);

template <template <typename> typename DispatchFn>
std::unique_ptr<column> scan_agg_dispatch(const column_view& input,
std::unique_ptr<aggregation> const& agg,
null_policy null_handling,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
if (agg->kind != aggregation::RANK && agg->kind != aggregation::DENSE_RANK) {
CUDF_EXPECTS(
is_numeric(input.type()) || is_compound(input.type()) || is_fixed_point(input.type()),
"Unexpected non-numeric or non-string type.");
}
CUDF_EXPECTS(
is_numeric(input.type()) || is_compound(input.type()) || is_fixed_point(input.type()),
"Unexpected non-numeric or non-string type.");

switch (agg->kind) {
case aggregation::SUM:
Expand All @@ -70,8 +60,6 @@ std::unique_ptr<column> scan_agg_dispatch(const column_view& input,
if (is_fixed_point(input.type())) CUDF_FAIL("decimal32/64 cannot support product scan");
return type_dispatcher<dispatch_storage_type>(
input.type(), DispatchFn<DeviceProduct>(), input, null_handling, stream, mr);
case aggregation::RANK: return inclusive_rank_scan(input, stream, mr);
case aggregation::DENSE_RANK: return inclusive_dense_rank_scan(input, stream, mr);
default: CUDF_FAIL("Unsupported aggregation operator for scan");
}
}
Expand Down
2 changes: 0 additions & 2 deletions cpp/src/reductions/scan/scan_exclusive.cu
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,6 @@ std::unique_ptr<column> scan_exclusive(const column_view& input,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
CUDF_EXPECTS(agg->kind != aggregation::RANK && agg->kind != aggregation::DENSE_RANK,
"Unsupported rank aggregation operator for exclusive scan");
auto output = scan_agg_dispatch<scan_dispatcher>(input, agg, null_handling, stream, mr);

if (null_handling == null_policy::EXCLUDE) {
Expand Down
102 changes: 1 addition & 101 deletions cpp/src/reductions/scan/scan_inclusive.cu
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,16 @@
#include "scan.cuh"

#include <cudf/column/column_device_view.cuh>
#include <cudf/column/column_factories.hpp>
#include <cudf/detail/copy.hpp>
#include <cudf/detail/iterator.cuh>
#include <cudf/detail/null_mask.hpp>
#include <cudf/null_mask.hpp>
#include <cudf/reduction.hpp>
#include <cudf/strings/detail/gather.cuh>
#include <cudf/table/row_operators.cuh>

#include <rmm/cuda_stream_view.hpp>
#include <rmm/device_uvector.hpp>
#include <rmm/exec_policy.hpp>

#include <structs/utilities.hpp>

#include <thrust/scan.h>

namespace cudf {
Expand Down Expand Up @@ -197,101 +192,8 @@ struct scan_dispatcher {
}
};

/**
* @brief generate row ranks or dense ranks using a row comparison then scan the results
*
* @tparam has_nulls if the order_by column has nulls
* @tparam value_resolver flag value resolver with boolean first and row number arguments
* @tparam scan_operator scan function ran on the flag values
* @param order_by input column to generate ranks for
* @param resolver flag value resolver
* @param scan_op scan operation ran on the flag results
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource used to allocate the returned column's device memory
* @return std::unique_ptr<column> rank values
*/
template <bool has_nulls, typename value_resolver, typename scan_operator>
std::unique_ptr<column> rank_generator(column_view const& order_by,
value_resolver resolver,
scan_operator scan_op,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
auto const superimposed = structs::detail::superimpose_parent_nulls(order_by, stream, mr);
table_view const order_table{{std::get<0>(superimposed)}};
auto const flattener = cudf::structs::detail::flatten_nested_columns(
order_table, {}, {}, structs::detail::column_nullability::MATCH_INCOMING);
auto const d_flat_order = table_device_view::create(std::get<0>(flattener), stream);
row_equality_comparator<has_nulls> comparator(*d_flat_order, *d_flat_order, true);
auto ranks = make_fixed_width_column(data_type{type_to_id<size_type>()},
order_table.num_rows(),
mask_state::UNALLOCATED,
stream,
mr);
auto mutable_ranks = ranks->mutable_view();

thrust::tabulate(rmm::exec_policy(stream),
mutable_ranks.begin<size_type>(),
mutable_ranks.end<size_type>(),
[comparator, resolver] __device__(size_type row_index) {
return resolver(row_index == 0 || !comparator(row_index, row_index - 1),
row_index);
});

thrust::inclusive_scan(rmm::exec_policy(stream),
mutable_ranks.begin<size_type>(),
mutable_ranks.end<size_type>(),
mutable_ranks.begin<size_type>(),
scan_op);
return ranks;
}

} // namespace

std::unique_ptr<column> inclusive_dense_rank_scan(column_view const& order_by,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
CUDF_EXPECTS(!cudf::structs::detail::is_or_has_nested_lists(order_by),
"Unsupported list type in dense_rank scan.");
if (has_nested_nulls(table_view{{order_by}})) {
return rank_generator<true>(
order_by,
[] __device__(bool equality, auto row_index) { return equality; },
DeviceSum{},
stream,
mr);
}
return rank_generator<false>(
order_by,
[] __device__(bool equality, auto row_index) { return equality; },
DeviceSum{},
stream,
mr);
}

std::unique_ptr<column> inclusive_rank_scan(column_view const& order_by,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
CUDF_EXPECTS(!cudf::structs::detail::is_or_has_nested_lists(order_by),
"Unsupported list type in rank scan.");
if (has_nested_nulls(table_view{{order_by}})) {
return rank_generator<true>(
order_by,
[] __device__(bool equality, auto row_index) { return equality ? row_index + 1 : 0; },
DeviceMax{},
stream,
mr);
}
return rank_generator<false>(
order_by,
[] __device__(bool equality, auto row_index) { return equality ? row_index + 1 : 0; },
DeviceMax{},
stream,
mr);
}

std::unique_ptr<column> scan_inclusive(
column_view const& input,
std::unique_ptr<aggregation> const& agg,
Expand All @@ -301,9 +203,7 @@ std::unique_ptr<column> scan_inclusive(
{
auto output = scan_agg_dispatch<scan_dispatcher>(input, agg, null_handling, stream, mr);

if (agg->kind == aggregation::RANK || agg->kind == aggregation::DENSE_RANK) {
return output;
} else if (null_handling == null_policy::EXCLUDE) {
if (null_handling == null_policy::EXCLUDE) {
output->set_null_mask(detail::copy_bitmask(input, stream, mr), input.null_count());
} else if (input.nullable()) {
output->set_null_mask(mask_scan(input, scan_type::INCLUSIVE, stream, mr), UNKNOWN_NULL_COUNT);
Expand Down
2 changes: 1 addition & 1 deletion cpp/tests/reductions/scan_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -684,7 +684,7 @@ TEST(RankScanTest, ExclusiveScan)

CUDF_EXPECT_THROW_MESSAGE(
scan(vals, make_dense_rank_aggregation(), scan_type::EXCLUSIVE, null_policy::INCLUDE),
"Unsupported rank aggregation operator for exclusive scan");
"Unsupported dense rank aggregation operator for exclusive scan");
CUDF_EXPECT_THROW_MESSAGE(
scan(vals, make_rank_aggregation(), scan_type::EXCLUSIVE, null_policy::INCLUDE),
"Unsupported rank aggregation operator for exclusive scan");
Expand Down