diff --git a/cpp/src/rolling/lead_lag_nested_detail.cuh b/cpp/src/rolling/lead_lag_nested_detail.cuh new file mode 100644 index 00000000000..a202626fc24 --- /dev/null +++ b/cpp/src/rolling/lead_lag_nested_detail.cuh @@ -0,0 +1,228 @@ +/* + * Copyright (c) 2021, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace cudf::detail { +namespace { +/** + * @brief Functor to calculate the gather map used for calculating LEAD/LAG. + * + * @tparam op Aggregation Kind (LEAD vs LAG) + * @tparam PrecedingIterator Iterator to retrieve preceding window bounds + * @tparam FollowingIterator Iterator to retrieve following window bounds + */ +template +class lead_lag_gather_map_builder { + public: + lead_lag_gather_map_builder(size_type input_size, + size_type row_offset, + PrecedingIterator preceding, + FollowingIterator following) + : _input_size{input_size}, + _null_index{input_size}, // Out of input range. Gather returns null. + _row_offset{row_offset}, + _preceding{preceding}, + _following{following} + { + } + + template + size_type __device__ operator()(size_type i) + { + // Note: grouped_*rolling_window() trims preceding/following to + // the beginning/end of the group. `rolling_window()` does not. + // Must trim _following[i] so as not to go past the column end. + auto following = min(_following[i], _input_size - i - 1); + return (_row_offset > following) ? _null_index : (i + _row_offset); + } + + template + size_type __device__ operator()(size_type i) + { + // Note: grouped_*rolling_window() trims preceding/following to + // the beginning/end of the group. `rolling_window()` does not. + // Must trim _preceding[i] so as not to go past the column start. + auto preceding = min(_preceding[i], i + 1); + return (_row_offset > (preceding - 1)) ? _null_index : (i - _row_offset); + } + + private: + size_type const _input_size; // Number of rows in input to LEAD/LAG. + size_type const _null_index; // Index value to use to output NULL for LEAD/LAG calculation. + size_type const _row_offset; // LEAD/LAG offset. E.g. For LEAD(2), _row_offset == 2. + PrecedingIterator _preceding; // Iterator to retrieve preceding window offset. + FollowingIterator _following; // Iterator to retrieve following window offset. +}; + +/** + * @brief Predicate to find indices at which LEAD/LAG evaluated to null. + */ +template +class is_null_index_predicate_impl { + public: + is_null_index_predicate_impl(size_type input_size, GatherMapIter gather_) + : _null_index{input_size}, _gather{gather_} + { + } + + bool __device__ operator()(size_type i) const { return _gather[i] == _null_index; } + + private: + size_type const _null_index; // Index value to use to output NULL for LEAD/LAG calculation. + GatherMapIter _gather; // Iterator for gather-map entries. +}; + +/** + * @brief Helper to construct is_null_index_predicate_impl + */ +template +is_null_index_predicate_impl is_null_index_predicate(size_type input_size, + GatherMapIter gather) +{ + return is_null_index_predicate_impl{input_size, gather}; +} + +} // namespace + +/** + * @brief Helper function to calculate LEAD/LAG for nested-type input columns. + * + * @tparam op The sort of aggregation being done (LEAD vs LAG) + * @tparam InputType The datatype of the input column being aggregated + * @tparam PrecedingIterator Iterator-type that returns the preceding bounds + * @tparam FollowingIterator Iterator-type that returns the following bounds + * @param[in] input Nested-type input column for LEAD/LAG calculation + * @param[in] default_outputs Default values to use as outputs, if LEAD/LAG + * offset crosses column/group boundaries + * @param[in] preceding Iterator to retrieve preceding window bounds + * @param[in] following Iterator to retrieve following window bounds + * @param[in] offset Lead/Lag offset, indicating which row after/before + * the current row is to be returned + * @param[in] stream CUDA stream for device memory operations/allocations + * @param[in] mr device_memory_resource for device memory allocations + */ +template ())> +std::unique_ptr compute_lead_lag_for_nested(column_view const& input, + column_view const& default_outputs, + PrecedingIter preceding, + FollowingIter following, + size_type offset, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr) +{ + CUDF_EXPECTS(default_outputs.type().id() == input.type().id(), + "Defaults column type must match input column."); // Because LEAD/LAG. + + CUDF_EXPECTS(default_outputs.is_empty() || (input.size() == default_outputs.size()), + "Number of defaults must match input column."); + + // For LEAD(0)/LAG(0), no computation need be performed. + // Return copy of input. + if (offset == 0) { return std::make_unique(input, stream, mr); } + + // Algorithm: + // + // 1. Construct gather_map with the LEAD/LAG offset applied to the indices. + // E.g. A gather_map of: + // {0, 1, 2, 3, ..., N-3, N-2, N-1} + // would select the input column, unchanged. + // + // For LEAD(2), the following gather_map is used: + // {3, 4, 5, 6, ..., N-1, NULL_INDEX, NULL_INDEX} + // where `NULL_INDEX` selects `NULL` for the gather. + // + // Similarly, LAG(2) is implemented using the following gather_map: + // {NULL_INDEX, NULL_INDEX, 0, 1, 2...} + // + // 2. Gather input column based on the gather_map. + // 3. If default outputs are available, scatter contents of `default_outputs` + // to all positions where nulls where gathered in step 2. + // + // Note: Step 3 can be switched to use `copy_if_else()`, once it supports + // nested types. + + auto static constexpr size_data_type = data_type{type_to_id()}; + + auto gather_map_column = + make_numeric_column(size_data_type, input.size(), mask_state::UNALLOCATED, stream); + auto gather_map = gather_map_column->mutable_view(); + + thrust::transform(rmm::exec_policy(stream), + thrust::make_counting_iterator(size_type{0}), + thrust::make_counting_iterator(size_type{input.size()}), + gather_map.begin(), + lead_lag_gather_map_builder{ + input.size(), offset, preceding, following}); + + auto output_with_nulls = + cudf::detail::gather(table_view{std::vector{input}}, + gather_map_column->view().template begin(), + gather_map_column->view().end(), + out_of_bounds_policy::NULLIFY, + stream, + mr); + + if (default_outputs.is_empty()) { return std::move(output_with_nulls->release()[0]); } + + // Must scatter defaults. + auto scatter_map = rmm::device_uvector(input.size(), stream); + + // Find all indices at which LEAD/LAG computed nulls previously. + auto scatter_map_end = + thrust::copy_if(rmm::exec_policy(stream), + thrust::make_counting_iterator(size_type{0}), + thrust::make_counting_iterator(size_type{input.size()}), + scatter_map.begin(), + is_null_index_predicate(input.size(), gather_map.begin())); + + // Bail early, if all LEAD/LAG computations succeeded. No defaults need be substituted. + if (scatter_map.is_empty()) { return std::move(output_with_nulls->release()[0]); } + + // Gather only those default values that are to be substituted. + auto gathered_defaults = + cudf::detail::gather(table_view{std::vector{default_outputs}}, + scatter_map.begin(), + scatter_map_end, + out_of_bounds_policy::DONT_CHECK, + stream); + + // Scatter defaults into locations where LEAD/LAG computed nulls. + auto scattered_results = cudf::detail::scatter( + table_view{std::vector{gathered_defaults->release()[0]->view()}}, + scatter_map.begin(), + scatter_map_end, + table_view{std::vector{output_with_nulls->release()[0]->view()}}, + false, + stream, + mr); + return std::move(scattered_results->release()[0]); +} + +} // namespace cudf::detail diff --git a/cpp/src/rolling/rolling_detail.cuh b/cpp/src/rolling/rolling_detail.cuh index 4b60c5aec0c..a26dc4c7120 100644 --- a/cpp/src/rolling/rolling_detail.cuh +++ b/cpp/src/rolling/rolling_detail.cuh @@ -16,6 +16,7 @@ #pragma once +#include "lead_lag_nested_detail.cuh" #include "rolling_detail.hpp" #include @@ -744,23 +745,14 @@ struct rolling_window_launcher { std::unique_ptr> launch(column_view const& input, column_view const& default_outputs, - PrecedingWindowIterator preceding_window_begin, - FollowingWindowIterator following_window_begin, + PrecedingWindowIterator preceding, + FollowingWindowIterator following, size_type min_periods, std::unique_ptr const& agg, agg_op const& device_agg_op, rmm::cuda_stream_view stream, rmm::mr::device_memory_resource* mr) { - CUDF_EXPECTS(default_outputs.type().id() == input.type().id(), - "Defaults column type must match input column."); // Because LEAD/LAG. - - // For LEAD(0)/LAG(0), no computation need be performed. - // Return copy of input. - if (0 == static_cast(agg.get())->row_offset) { - return std::make_unique(input, stream, mr); - } - auto output = make_fixed_width_column( target_type(input.type(), op), input.size(), mask_state::UNINITIALIZED, stream, mr); @@ -770,8 +762,8 @@ struct rolling_window_launcher { input, default_outputs, output_view, - preceding_window_begin, - following_window_begin, + preceding, + following, min_periods, agg, device_agg_op, @@ -782,30 +774,6 @@ struct rolling_window_launcher { return output; } - // Deals with invalid column and/or aggregation options - template - std::enable_if_t(), - std::unique_ptr> - launch(column_view const& input, - column_view const& default_outputs, - PrecedingWindowIterator preceding_window_begin, - FollowingWindowIterator following_window_begin, - size_type min_periods, - std::unique_ptr const& agg, - agg_op device_agg_op, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr) - { - CUDF_FAIL( - "Aggregation operator and/or input type combination is invalid: " - "LEAD/LAG supported only on fixed-width types"); - } - template @@ -866,7 +834,9 @@ struct rolling_window_launcher { template - std::enable_if_t<(op == aggregation::LEAD || op == aggregation::LAG), std::unique_ptr> + std::enable_if_t() && + (op == aggregation::LEAD || op == aggregation::LAG), + std::unique_ptr> operator()(column_view const& input, column_view const& default_outputs, PrecedingWindowIterator preceding_window_begin, @@ -892,6 +862,32 @@ struct rolling_window_launcher { mr); } + template + std::enable_if_t() && + (op == aggregation::LEAD || op == aggregation::LAG), + std::unique_ptr> + operator()(column_view const& input, + column_view const& default_outputs, + PrecedingWindowIterator preceding_window_begin, + FollowingWindowIterator following_window_begin, + size_type min_periods, + std::unique_ptr const& agg, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr) + { + return cudf::detail:: + compute_lead_lag_for_nested( + input, + default_outputs, + preceding_window_begin, + following_window_begin, + static_cast(agg.get())->row_offset, + stream, + mr); + } + /** * @brief Creates the offsets child of the result of the `COLLECT_LIST` window aggregation * diff --git a/cpp/tests/rolling/lead_lag_test.cpp b/cpp/tests/rolling/lead_lag_test.cpp index 1cf7d74285c..bc71a7acab9 100644 --- a/cpp/tests/rolling/lead_lag_test.cpp +++ b/cpp/tests/rolling/lead_lag_test.cpp @@ -18,6 +18,7 @@ #include #include #include +#include #include #include #include @@ -28,6 +29,7 @@ #include #include #include +#include #include #include @@ -323,7 +325,6 @@ TYPED_TEST(TypedLeadLagWindowTest, TestLeadLagWithZeroOffsets) following, min_periods, cudf::make_lag_aggregation(0)); - ; expect_columns_equivalent(*lag_0_output_col, *input_col); } @@ -354,7 +355,6 @@ TYPED_TEST(TypedLeadLagWindowTest, TestLeadLagWithNegativeOffsets) following, min_periods, cudf::make_lag_aggregation(-3)); - ; expect_columns_equivalent( *lag_minus_3_output_col, @@ -404,7 +404,6 @@ TYPED_TEST(TypedLeadLagWindowTest, TestLeadLagWithNoGrouping) following, min_periods, cudf::make_lead_aggregation(3)); - ; expect_columns_equivalent( *lead_3_output_col, @@ -513,31 +512,593 @@ TYPED_TEST(TypedLeadLagWindowTest, DefaultValuesWithoutLeadLag) aggs.begin(), aggs.end(), [&](auto& agg) { assert_aggregation_fails(std::move(agg)); }); } -TEST_F(LeadLagWindowTest, LeadLagWithoutFixedWidthInput) +template +struct TypedNestedLeadLagWindowTest : public cudf::test::BaseFixture { +}; + +TYPED_TEST_CASE(TypedNestedLeadLagWindowTest, TypesForTest); + +TYPED_TEST(TypedNestedLeadLagWindowTest, NumericListsWithNullsAllOver) { - // Check that Lead/Lag aren't supported for non-fixed-width types. + using T = TypeParam; + using lcw = lists_column_wrapper; + + auto null_at_2 = cudf::test::iterator_with_null_at(2); + auto const input_col = lcw{{{0, 0}, + {1, 1}, + {2, 2}, + {3, 3, 3}, + {{4, 4, 4, 4}, null_at_2}, + {5, 5, 5, 5, 5}, + {0, 0}, + {10, 10}, + {20, 20}, + {30, 30, 30}, + {40, 40, 40, 40}, + {{50, 50, 50, 50, 50}, null_at_2}}, + null_at_2} + .release(); - auto const input_col = strings_column_wrapper{ - {"0", "1", "2", "3", "4", "5"}, cudf::detail::make_counting_transform_iterator(0, [](auto i) { - return false; - })}.release(); + auto const grouping_key = fixed_width_column_wrapper{0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1}; + auto const grouping_keys = cudf::table_view{std::vector{grouping_key}}; - auto const grouping_key = fixed_width_column_wrapper{0, 0, 0, 0, 0, 0}; + auto const preceding = 4; + auto const following = 3; + auto const min_periods = 1; + + auto lead_3_output_col = cudf::grouped_rolling_window(grouping_keys, + input_col->view(), + preceding, + following, + min_periods, + cudf::make_lead_aggregation(3)); + + CUDF_TEST_EXPECT_COLUMNS_EQUIVALENT( + lead_3_output_col->view(), + lcw{{{3, 3, 3}, + {{4, 4, 4, 4}, null_at_2}, + {5, 5, 5, 5, 5}, + {}, + {}, + {}, + {30, 30, 30}, + {40, 40, 40, 40}, + {{50, 50, 50, 50, 50}, null_at_2}, + {}, + {}, + {}}, + iterator_with_null_at(std::vector{3, 4, 5, 9, 10, 11})} + .release() + ->view()); + + auto lag_1_output_col = cudf::grouped_rolling_window(grouping_keys, + input_col->view(), + preceding, + following, + min_periods, + cudf::make_lag_aggregation(1)); + + expect_columns_equivalent(lag_1_output_col->view(), + lcw{{{}, + {0, 0}, + {1, 1}, + {2, 2}, + {3, 3, 3}, + {{4, 4, 4, 4}, null_at_2}, + {}, + {0, 0}, + {10, 10}, + {20, 20}, + {30, 30, 30}, + {40, 40, 40, 40}}, + iterator_with_null_at(std::vector{0, 3, 6})} + .release() + ->view()); +} + +TYPED_TEST(TypedNestedLeadLagWindowTest, NumericListsWithDefaults) +{ + using T = TypeParam; + using lcw = lists_column_wrapper; + + auto null_at_2 = cudf::test::iterator_with_null_at(2); + auto const input_col = lcw{{{0, 0}, + {1, 1}, + {2, 2}, + {3, 3, 3}, + {{4, 4, 4, 4}, null_at_2}, + {5, 5, 5, 5, 5}, + {0, 0}, + {10, 10}, + {20, 20}, + {30, 30, 30}, + {40, 40, 40, 40}, + {{50, 50, 50, 50, 50}, null_at_2}}, + null_at_2} + .release(); + + auto const defaults_col = + lcw{ + { + {}, + {91, 91}, + {92, 92}, + {}, // null! + {94, 94, 94}, + {95, 95}, + {}, + {91, 91}, + {92, 92}, + {}, // null! + {94, 94, 94}, + {95, 95}, + }, + } + .release(); + + auto const grouping_key = fixed_width_column_wrapper{0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1}; auto const grouping_keys = cudf::table_view{std::vector{grouping_key}}; - auto const default_value = cudf::make_string_scalar("99"); - auto const default_outputs = cudf::make_column_from_scalar(*default_value, input_col->size()); + auto const preceding = 4; + auto const following = 3; + auto const min_periods = 1; + + auto lead_3_output_col = cudf::grouped_rolling_window(grouping_keys, + input_col->view(), + preceding, + following, + min_periods, + cudf::make_lead_aggregation(3)); + + CUDF_TEST_EXPECT_COLUMNS_EQUIVALENT( + lead_3_output_col->view(), + lcw{{{3, 3, 3}, + {{4, 4, 4, 4}, null_at_2}, + {5, 5, 5, 5, 5}, + {}, + {}, + {}, + {30, 30, 30}, + {40, 40, 40, 40}, + {{50, 50, 50, 50, 50}, null_at_2}, + {}, + {}, + {}}, + iterator_with_null_at(std::vector{3, 4, 5, 9, 10, 11})} + .release() + ->view()); + + auto lag_1_output_col = cudf::grouped_rolling_window(grouping_keys, + input_col->view(), + preceding, + following, + min_periods, + cudf::make_lag_aggregation(1)); + + expect_columns_equivalent(lag_1_output_col->view(), + lcw{{{}, + {0, 0}, + {1, 1}, + {2, 2}, + {3, 3, 3}, + {{4, 4, 4, 4}, null_at_2}, + {}, + {0, 0}, + {10, 10}, + {20, 20}, + {30, 30, 30}, + {40, 40, 40, 40}}, + iterator_with_null_at(std::vector{0, 3, 6})} + .release() + ->view()); +} + +TYPED_TEST(TypedNestedLeadLagWindowTest, Structs) +{ + using T = TypeParam; + using lcw = lists_column_wrapper; + + auto null_at_2 = cudf::test::iterator_with_null_at(2); + auto lists_col = lcw{{{0, 0}, + {1, 1}, + {2, 2}, + {3, 3, 3}, + {{4, 4, 4, 4}, null_at_2}, + {5, 5, 5, 5, 5}, + {0, 0}, + {10, 10}, + {20, 20}, + {30, 30, 30}, + {40, 40, 40, 40}, + {{50, 50, 50, 50, 50}, null_at_2}}, + null_at_2}; + + auto strings_col = strings_column_wrapper{{"00", + "11", + "22", + "333", + "4444", + "55555", + "00", + "1010", + "2020", + "303030", + "40404040", + "5050505050"}, + iterator_with_null_at(9)}; + + auto structs_col = structs_column_wrapper{lists_col, strings_col}.release(); + + auto const grouping_key = fixed_width_column_wrapper{0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1}; + auto const grouping_keys = cudf::table_view{std::vector{grouping_key}}; + + auto const preceding = 4; + auto const following = 3; + auto const min_periods = 1; + + // Test LEAD(). + { + auto lead_3_output_col = cudf::grouped_rolling_window(grouping_keys, + structs_col->view(), + preceding, + following, + min_periods, + cudf::make_lead_aggregation(3)); + auto expected_lists_col = + lcw{{{3, 3, 3}, + {{4, 4, 4, 4}, null_at_2}, + {5, 5, 5, 5, 5}, + {}, + {}, + {}, + {30, 30, 30}, + {40, 40, 40, 40}, + {{50, 50, 50, 50, 50}, null_at_2}, + {}, + {}, + {}}, + iterator_with_null_at(std::vector{3, 4, 5, 9, 10, 11})}; + auto expected_strings_col = strings_column_wrapper{ + {"333", "4444", "55555", "", "", "", "", "40404040", "5050505050", "", "", ""}, + iterator_with_null_at(std::vector{3, 4, 5, 6, 9, 10, 11})}; + + auto expected_structs_col = + structs_column_wrapper{{expected_lists_col, expected_strings_col}, + iterator_with_null_at(std::vector{3, 4, 5, 9, 10, 11})} + .release(); + + CUDF_TEST_EXPECT_COLUMNS_EQUIVALENT(lead_3_output_col->view(), expected_structs_col->view()); + } + + // Test LAG() + { + auto lag_1_output_col = cudf::grouped_rolling_window(grouping_keys, + structs_col->view(), + preceding, + following, + min_periods, + cudf::make_lag_aggregation(1)); + auto expected_lists_col = lcw{{{}, // null. + {0, 0}, + {1, 1}, + {}, // null. + {3, 3, 3}, + {{4, 4, 4, 4}, null_at_2}, + {}, // null. + {0, 0}, + {10, 10}, + {20, 20}, + {30, 30, 30}, + {40, 40, 40, 40}}, + iterator_with_null_at(std::vector{0, 3, 6})}; + auto expected_strings_col = + strings_column_wrapper{{"", // null. + "00", + "11", + "22", + "333", + "4444", + "", // null. + "00", + "1010", + "2020", + "", // null. + "40404040"}, + iterator_with_null_at(std::vector{0, 6, 10})}; + + auto expected_structs_col = + structs_column_wrapper{{expected_lists_col, expected_strings_col}, + iterator_with_null_at(std::vector{0, 6})} + .release(); + + CUDF_TEST_EXPECT_COLUMNS_EQUIVALENT(lag_1_output_col->view(), expected_structs_col->view()); + } +} + +struct LeadLagNonFixedWidthTest : cudf::test::BaseFixture { +}; + +TEST_F(LeadLagNonFixedWidthTest, StringsNoDefaults) +{ + using namespace cudf; + using namespace cudf::test; + + auto input_col = strings_column_wrapper{{"", + "A_1", + "A_22", + "A_333", + "A_4444", + "A_55555", + "B_0", + "", + "B_22", + "B_333", + "B_4444", + "B_55555"}, + iterator_with_null_at(std::vector{0, 7})} + .release(); + + auto const grouping_key = fixed_width_column_wrapper{0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1}; + auto const grouping_keys = cudf::table_view{std::vector{grouping_key}}; + + auto const preceding = 4; + auto const following = 3; + auto const min_periods = 1; + + auto lead_2 = grouped_rolling_window(grouping_keys, + input_col->view(), + preceding, + following, + min_periods, + cudf::make_lead_aggregation(2)); + CUDF_TEST_EXPECT_COLUMNS_EQUIVALENT( + lead_2->view(), + strings_column_wrapper{ + {"A_22", "A_333", "A_4444", "A_55555", "", "", "B_22", "B_333", "B_4444", "B_55555", "", ""}, + iterator_with_null_at(std::vector{4, 5, 10, 11})}); + + auto lag_1 = grouped_rolling_window(grouping_keys, + input_col->view(), + preceding, + following, + min_periods, + cudf::make_lag_aggregation(1)); + + CUDF_TEST_EXPECT_COLUMNS_EQUIVALENT( + lag_1->view(), + strings_column_wrapper{ + {"", "", "A_1", "A_22", "A_333", "A_4444", "", "B_0", "", "B_22", "B_333", "B_4444"}, + iterator_with_null_at(std::vector{0, 1, 6, 8})}); +} + +TEST_F(LeadLagNonFixedWidthTest, StringsWithDefaults) +{ + using namespace cudf; + using namespace cudf::test; + + auto input_col = strings_column_wrapper{{"", + "A_1", + "A_22", + "A_333", + "A_4444", + "A_55555", + "B_0", + "", + "B_22", + "B_333", + "B_4444", + "B_55555"}, + iterator_with_null_at(std::vector{0, 7})} + .release(); + + auto defaults_col = strings_column_wrapper{"9999", + "9999", + "9999", + "9999", + "9999", + "9999", + "9999", + "9999", + "9999", + "9999", + "9999", + "9999"} + .release(); + + auto const grouping_key = fixed_width_column_wrapper{0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1}; + auto const grouping_keys = cudf::table_view{std::vector{grouping_key}}; + + auto const preceding = 4; + auto const following = 3; + auto const min_periods = 1; + + auto lead_2 = grouped_rolling_window(grouping_keys, + input_col->view(), + defaults_col->view(), + preceding, + following, + min_periods, + cudf::make_lead_aggregation(2)); + CUDF_TEST_EXPECT_COLUMNS_EQUIVALENT(lead_2->view(), + strings_column_wrapper{"A_22", + "A_333", + "A_4444", + "A_55555", + "9999", + "9999", + "B_22", + "B_333", + "B_4444", + "B_55555", + "9999", + "9999"}); + + auto lag_1 = grouped_rolling_window(grouping_keys, + input_col->view(), + defaults_col->view(), + preceding, + following, + min_periods, + cudf::make_lag_aggregation(1)); + + CUDF_TEST_EXPECT_COLUMNS_EQUIVALENT( + lag_1->view(), + strings_column_wrapper{ + {"9999", "", "A_1", "A_22", "A_333", "A_4444", "9999", "B_0", "", "B_22", "B_333", "B_4444"}, + iterator_with_null_at(std::vector{1, 8})}); +} + +TEST_F(LeadLagNonFixedWidthTest, StringsWithDefaultsNoGroups) +{ + using namespace cudf; + using namespace cudf::test; + + auto input_col = strings_column_wrapper{{"", + "A_1", + "A_22", + "A_333", + "A_4444", + "A_55555", + "B_0", + "", + "B_22", + "B_333", + "B_4444", + "B_55555"}, + iterator_with_null_at(std::vector{0, 7})} + .release(); + + auto defaults_col = strings_column_wrapper{"9999", + "9999", + "9999", + "9999", + "9999", + "9999", + "9999", + "9999", + "9999", + "9999", + "9999", + "9999"} + .release(); + + auto const grouping_key = fixed_width_column_wrapper{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}; + auto const grouping_keys = cudf::table_view{std::vector{grouping_key}}; + + auto const preceding = 4; + auto const following = 3; + auto const min_periods = 1; + + auto lead_2 = grouped_rolling_window(grouping_keys, + input_col->view(), + defaults_col->view(), + preceding, + following, + min_periods, + cudf::make_lead_aggregation(2)); + CUDF_TEST_EXPECT_COLUMNS_EQUIVALENT(lead_2->view(), + strings_column_wrapper{{"A_22", + "A_333", + "A_4444", + "A_55555", + "B_0", + "", + "B_22", + "B_333", + "B_4444", + "B_55555", + "9999", + "9999"}, + iterator_with_null_at(5)}); + + auto lag_1 = grouped_rolling_window(grouping_keys, + input_col->view(), + defaults_col->view(), + preceding, + following, + min_periods, + cudf::make_lag_aggregation(1)); + + CUDF_TEST_EXPECT_COLUMNS_EQUIVALENT( + lag_1->view(), + strings_column_wrapper{{"9999", + "", + "A_1", + "A_22", + "A_333", + "A_4444", + "A_55555", + "B_0", + "", + "B_22", + "B_333", + "B_4444"}, + iterator_with_null_at(std::vector{1, 8})}); +} + +TEST_F(LeadLagNonFixedWidthTest, Dictionary) +{ + using namespace cudf; + using namespace cudf::test; + + using dictionary = cudf::test::dictionary_column_wrapper; + + auto input_strings = std::initializer_list{"", + "A_1", + "A_22", + "A_333", + "A_4444", + "A_55555", + "B_0", + "", + "B_22", + "B_333", + "B_4444", + "B_55555"}; + auto input_col = dictionary{input_strings}.release(); + + auto const grouping_key = fixed_width_column_wrapper{0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1}; + auto const grouping_keys = cudf::table_view{std::vector{grouping_key}}; auto const preceding = 4; auto const following = 3; auto const min_periods = 1; - EXPECT_THROW(cudf::grouped_rolling_window(grouping_keys, - input_col->view(), - default_outputs->view(), - preceding, - following, - min_periods, - cudf::make_lead_aggregation(4)), - cudf::logic_error); + { + auto lead_2 = grouped_rolling_window(grouping_keys, + input_col->view(), + preceding, + following, + min_periods, + cudf::make_lead_aggregation(2)); + + auto expected_keys = strings_column_wrapper{input_strings}.release(); + auto expected_values = + fixed_width_column_wrapper{{2, 3, 4, 5, 0, 0, 7, 8, 9, 10, 0, 0}, + iterator_with_null_at(std::vector{4, 5, 10, 11})} + .release(); + auto expected_output = + make_dictionary_column(expected_keys->view(), expected_values->view()).release(); + + CUDF_TEST_EXPECT_COLUMNS_EQUIVALENT(lead_2->view(), expected_output->view()); + } + + { + auto lag_1 = grouped_rolling_window(grouping_keys, + input_col->view(), + preceding, + following, + min_periods, + cudf::make_lag_aggregation(1)); + + auto expected_keys = strings_column_wrapper{input_strings}.release(); + auto expected_values = + fixed_width_column_wrapper{{0, 0, 1, 2, 3, 4, 0, 6, 0, 7, 8, 9}, + iterator_with_null_at(std::vector{0, 6})} + .release(); + auto expected_output = + make_dictionary_column(expected_keys->view(), expected_values->view()).release(); + + CUDF_TEST_EXPECT_COLUMNS_EQUIVALENT(lag_1->view(), expected_output->view()); + } }