Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement cudf::rolling for decimal32 and decimal64 #7037

Merged
merged 4 commits into from
Dec 31, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions cpp/src/aggregation/aggregation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,10 +155,13 @@ std::unique_ptr<aggregation> make_udf_aggregation(udf_type type,
namespace detail {
namespace {
struct target_type_functor {
data_type type;
template <typename Source, aggregation::Kind k>
constexpr data_type operator()() const noexcept
{
return data_type{type_to_id<target_type_t<Source, k>>()};
auto const id = type_to_id<target_type_t<Source, k>>();
return id == type_id::DECIMAL32 || id == type_id::DECIMAL64 ? data_type{id, type.scale()}
: data_type{id};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why skip scale for non-decimal type? (It will have default value, anyway it won't be used).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ctor with scale only designed to be called for Decimal types.

  explicit data_type(type_id id, int32_t scale) : _id{id}, _fixed_point_scale{scale}
  {
    assert(id == type_id::DECIMAL32 || id == type_id::DECIMAL64);
  }

}
};

Expand All @@ -174,7 +177,7 @@ struct is_valid_aggregation_impl {
// Return target data_type for the given source_type and aggregation
data_type target_type(data_type source, aggregation::Kind k)
{
return dispatch_type_and_aggregation(source, k, target_type_functor{});
return dispatch_type_and_aggregation(source, k, target_type_functor{source});
}

// Verifies the aggregation `k` is valid on the type `source`
Expand Down
14 changes: 10 additions & 4 deletions cpp/src/rolling/rolling_detail.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,9 @@ struct rolling_window_launcher {
std::unique_ptr<aggregation> const& agg,
rmm::cuda_stream_view stream)
{
using Type = device_storage_type_t<T>;
using OutType = device_storage_type_t<target_type_t<InputType, op>>;

constexpr cudf::size_type block_size = 256;
cudf::detail::grid_1d grid(input.size(), block_size);

Expand All @@ -516,7 +519,7 @@ struct rolling_window_launcher {
rmm::device_scalar<size_type> device_valid_count{0, stream};

if (input.has_nulls()) {
gpu_rolling<T, target_type_t<InputType, op>, agg_op, op, block_size, true>
gpu_rolling<Type, OutType, agg_op, op, block_size, true>
<<<grid.num_blocks, block_size, 0, stream.value()>>>(*input_device_view,
*default_outputs_device_view,
*output_device_view,
Expand All @@ -525,7 +528,7 @@ struct rolling_window_launcher {
following_window_begin,
min_periods);
} else {
gpu_rolling<T, target_type_t<InputType, op>, agg_op, op, block_size, false>
gpu_rolling<Type, OutType, agg_op, op, block_size, false>
<<<grid.num_blocks, block_size, 0, stream.value()>>>(*input_device_view,
*default_outputs_device_view,
*output_device_view,
Expand Down Expand Up @@ -558,6 +561,9 @@ struct rolling_window_launcher {
agg_op const& device_agg_op,
rmm::cuda_stream_view stream)
{
using Type = device_storage_type_t<T>;
using OutType = device_storage_type_t<target_type_t<InputType, op>>;

constexpr cudf::size_type block_size = 256;
cudf::detail::grid_1d grid(input.size(), block_size);

Expand All @@ -568,7 +574,7 @@ struct rolling_window_launcher {
rmm::device_scalar<size_type> device_valid_count{0, stream};

if (input.has_nulls()) {
gpu_rolling<T, target_type_t<InputType, op>, agg_op, op, block_size, true>
gpu_rolling<Type, OutType, agg_op, op, block_size, true>
<<<grid.num_blocks, block_size, 0, stream.value()>>>(*input_device_view,
*default_outputs_device_view,
*output_device_view,
Expand All @@ -578,7 +584,7 @@ struct rolling_window_launcher {
min_periods,
device_agg_op);
} else {
gpu_rolling<T, target_type_t<InputType, op>, agg_op, op, block_size, false>
gpu_rolling<Type, OutType, agg_op, op, block_size, false>
<<<grid.num_blocks, block_size, 0, stream.value()>>>(*input_device_view,
*default_outputs_device_view,
*output_device_view,
Expand Down
5 changes: 4 additions & 1 deletion cpp/src/rolling/rolling_detail.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,10 @@ static constexpr bool is_rolling_supported()
return (op == aggregation::MIN) or (op == aggregation::MAX) or
(op == aggregation::COUNT_VALID) or (op == aggregation::COUNT_ALL) or
(op == aggregation::ROW_NUMBER) or (op == aggregation::LEAD) or (op == aggregation::LAG);

} else if (cudf::is_fixed_point<ColumnType>()) {
return (op == aggregation::MIN) or (op == aggregation::MAX) or
(op == aggregation::COUNT_VALID) or (op == aggregation::COUNT_ALL) or
(op == aggregation::LEAD) or (op == aggregation::LAG);
} else if (std::is_same<ColumnType, cudf::string_view>()) {
return (op == aggregation::MIN) or (op == aggregation::MAX) or
(op == aggregation::COUNT_VALID) or (op == aggregation::COUNT_ALL) or
Expand Down
78 changes: 78 additions & 0 deletions cpp/tests/rolling/rolling_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -952,4 +952,82 @@ TEST_F(RollingTestUdf, DynamicWindow)
CUDF_TEST_EXPECT_COLUMNS_EQUAL(*output, expected);
}

template <typename T>
struct FixedPointTests : public cudf::test::BaseFixture {
};

TYPED_TEST_CASE(FixedPointTests, cudf::test::FixedPointTypes);

TYPED_TEST(FixedPointTests, MinMaxCountLagLead)
{
using namespace numeric;
using namespace cudf;
using decimalXX = TypeParam;
using RepType = cudf::device_storage_type_t<decimalXX>;
using fp_wrapper = cudf::test::fixed_point_column_wrapper<RepType>;
using fw_wrapper = cudf::test::fixed_width_column_wrapper<size_type>;

auto const scale = scale_type{-1};
auto const input = fp_wrapper{{42, 1729, 55, 3, 1, 2}, {1, 1, 1, 1, 1, 1}, scale};
auto const expected_min = fp_wrapper{{42, 42, 3, 1, 1, 1}, {1, 1, 1, 1, 1, 1}, scale};
auto const expected_max = fp_wrapper{{1729, 1729, 1729, 55, 3, 2}, {1, 1, 1, 1, 1, 1}, scale};
auto const expected_lag = fp_wrapper{{0, 42, 1729, 55, 3, 1}, {0, 1, 1, 1, 1, 1}, scale};
auto const expected_lead = fp_wrapper{{1729, 55, 3, 1, 2, 0}, {1, 1, 1, 1, 1, 0}, scale};
auto const expected_count_val = fw_wrapper{{2, 3, 3, 3, 3, 2}, {1, 1, 1, 1, 1, 1}};
auto const expected_count_all = fw_wrapper{{2, 3, 3, 3, 3, 2}, {1, 1, 1, 1, 1, 1}};
// auto const expected_rowno = fw_wrapper{{1, 2, 2, 2, 2, 2}, {1, 1, 1, 1, 1, 1}};

auto const min = rolling_window(input, 2, 1, 1, make_min_aggregation());
auto const max = rolling_window(input, 2, 1, 1, make_max_aggregation());
auto const lag = rolling_window(input, 2, 1, 1, make_lag_aggregation(1));
auto const lead = rolling_window(input, 2, 1, 1, make_lead_aggregation(1));
auto const valid = rolling_window(input, 2, 1, 1, make_count_aggregation());
auto const all = rolling_window(input, 2, 1, 1, make_count_aggregation(null_policy::INCLUDE));
EXPECT_THROW(rolling_window(input, 2, 1, 1, make_row_number_aggregation()), cudf::logic_error);

CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected_min, min->view());
CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected_max, max->view());
CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected_lag, lag->view());
CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected_lead, lead->view());
CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected_count_val, valid->view());
CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected_count_all, all->view());
// CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected_rowno, rowno->view());
codereport marked this conversation as resolved.
Show resolved Hide resolved
}

TYPED_TEST(FixedPointTests, MinMaxCountLagLeadNulls)
{
using namespace numeric;
using namespace cudf;
using decimalXX = TypeParam;
using RepType = cudf::device_storage_type_t<decimalXX>;
using fp_wrapper = cudf::test::fixed_point_column_wrapper<RepType>;
using fw_wrapper = cudf::test::fixed_width_column_wrapper<size_type>;

auto const scale = scale_type{-1};
auto const input = fp_wrapper{{42, 1729, 55, 343, 1, 2}, {1, 0, 1, 0, 1, 1}, scale};
auto const expected_min = fp_wrapper{{42, 42, 55, 1, 1, 1}, {1, 1, 1, 1, 1, 1}, scale};
auto const expected_max = fp_wrapper{{42, 55, 55, 55, 2, 2}, {1, 1, 1, 1, 1, 1}, scale};
auto const expected_lag = fp_wrapper{{0, 42, 1729, 55, 343, 1}, {0, 1, 0, 1, 0, 1}, scale};
auto const expected_lead = fp_wrapper{{1729, 55, 343, 1, 2, 0}, {0, 1, 0, 1, 1, 0}, scale};
auto const expected_count_val = fw_wrapper{{1, 2, 1, 2, 2, 2}, {1, 1, 1, 1, 1, 1}};
auto const expected_count_all = fw_wrapper{{2, 3, 3, 3, 3, 2}, {1, 1, 1, 1, 1, 1}};
// auto const expected_rowno = fw_wrapper{{1, 2, 2, 2, 2, 2}, {1, 1, 1, 1, 1, 1}};

auto const min = rolling_window(input, 2, 1, 1, make_min_aggregation());
auto const max = rolling_window(input, 2, 1, 1, make_max_aggregation());
auto const lag = rolling_window(input, 2, 1, 1, make_lag_aggregation(1));
auto const lead = rolling_window(input, 2, 1, 1, make_lead_aggregation(1));
auto const valid = rolling_window(input, 2, 1, 1, make_count_aggregation());
auto const all = rolling_window(input, 2, 1, 1, make_count_aggregation(null_policy::INCLUDE));
EXPECT_THROW(rolling_window(input, 2, 1, 1, make_row_number_aggregation()), cudf::logic_error);

CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected_min, min->view());
CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected_max, max->view());
CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected_lag, lag->view());
CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected_lead, lead->view());
CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected_count_val, valid->view());
CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected_count_all, all->view());
// CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected_rowno, rowno->view());
}

CUDF_TEST_PROGRAM_MAIN()