Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extend LEAD/LAG to work with non-fixed-width types #8062

Merged
merged 9 commits into from
May 3, 2021
227 changes: 227 additions & 0 deletions cpp/src/rolling/lead_lag_nested_detail.cuh
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
/*
* Copyright (c) 2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <cudf/column/column.hpp>
#include <cudf/column/column_view.hpp>
#include <cudf/copying.hpp>
#include <cudf/detail/gather.cuh>
#include <cudf/detail/scatter.cuh>
#include <cudf/utilities/traits.hpp>
#include <vector>

namespace cudf::detail {
namespace {
/**
* @brief Functor to calculate the gather map used for calculating LEAD/LAG.
*
* @tparam op Aggregation Kind (LEAD vs LAG)
* @tparam PrecedingIterator Iterator to retrieve preceding window bounds
* @tparam FollowingIterator Iterator to retrieve following window bounds
*/
template <aggregation::Kind op, typename PrecedingIterator, typename FollowingIterator>
class lead_lag_gather_map_builder {
public:
lead_lag_gather_map_builder(size_type input_size,
size_type row_offset,
PrecedingIterator preceding,
FollowingIterator following)
: _input_size{input_size},
_null_index{input_size + 1}, // Out of input range. Gather returns null.
mythrocks marked this conversation as resolved.
Show resolved Hide resolved
_row_offset{row_offset},
_preceding{preceding},
_following{following}
{
}

template <aggregation::Kind o = op, CUDF_ENABLE_IF(o == aggregation::LEAD)>
size_type __device__ operator()(size_type i)
{
// Note: grouped_*rolling_window() trims preceding/following to
// the beginning/end of the group. `rolling_window()` does not.
// Must trim _following[i] so as not to go past the column end.
auto following = min(_following[i], _input_size - i - 1);
return (_row_offset > following) ? _null_index : (i + _row_offset);
}

template <aggregation::Kind o = op, CUDF_ENABLE_IF(o == aggregation::LAG)>
size_type __device__ operator()(size_type i)
{
// Note: grouped_*rolling_window() trims preceding/following to
// the beginning/end of the group. `rolling_window()` does not.
// Must trim _preceding[i] so as not to go past the column start.
auto preceding = min(_preceding[i], i + 1);
return (_row_offset > (preceding - 1)) ? _null_index : (i - _row_offset);
}

private:
size_type const _input_size; // Number of rows in input to LEAD/LAG.
mythrocks marked this conversation as resolved.
Show resolved Hide resolved
size_type const _null_index; // Index value to use to output NULL for LEAD/LAG calculation.
size_type const _row_offset; // LEAD/LAG offset. E.g. For LEAD(2), _row_offset == 2.
PrecedingIterator _preceding; // Iterator to retrieve preceding window offset.
FollowingIterator _following; // Iterator to retrieve following window offset.
};

/**
* @brief Predicate to find indices at which LEAD/LAG evaluated to null.
*/
template <typename GatherMapIter>
class is_null_index_predicate_impl {
public:
is_null_index_predicate_impl(size_type input_size, GatherMapIter gather_)
: _null_index{input_size + 1}, _gather{gather_}
{
}

bool __device__ operator()(size_type i) const { return _gather[i] == _null_index; }

private:
size_type const _null_index; // Index value to use to output NULL for LEAD/LAG calculation.
GatherMapIter _gather; // Iterator for gather-map entries.
};

/**
* @brief Helper to construct is_null_index_predicate_impl
*/
template <typename GatherMapIter>
is_null_index_predicate_impl<GatherMapIter> is_null_index_predicate(size_type input_size,
GatherMapIter gather)
{
return is_null_index_predicate_impl<GatherMapIter>{input_size, gather};
}

} // namespace

/**
* @brief Helper function to calculate LEAD/LAG for nested-type input columns.
*
* @tparam op The sort of aggregation being done (LEAD vs LAG)
* @tparam InputType The datatype of the input column being aggregated
* @tparam PrecedingIterator Iterator-type that returns the preceding bounds
* @tparam FollowingIterator Iterator-type that returns the following bounds
* @param[in] input Nested-type input column for LEAD/LAG calculation
* @param[in] default_outputs Default values to use as outputs, if LEAD/LAG
* offset crosses column/group boundaries
* @param[in] preceding Iterator to retrieve preceding window bounds
* @param[in] following Iterator to retrieve following window bounds
* @param[in] offset Lead/Lag offset, indicating which row after/before
* the current row is to be returned
* @param[in] stream CUDA stream for device memory operations/allocations
* @param[in] mr device_memory_resource for device memory allocations
*/
template <aggregation::Kind op,
typename InputType,
typename PrecedingIter,
typename FollowingIter,
CUDF_ENABLE_IF(!cudf::is_fixed_width<InputType>())>
std::unique_ptr<column> compute_lead_lag_for_nested(column_view const& input,
column_view const& default_outputs,
PrecedingIter preceding,
FollowingIter following,
size_type offset,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
CUDF_EXPECTS(default_outputs.type().id() == input.type().id(),
"Defaults column type must match input column."); // Because LEAD/LAG.

CUDF_EXPECTS(default_outputs.is_empty() || (input.size() == default_outputs.size()),
"Number of defaults must match input column.");

// For LEAD(0)/LAG(0), no computation need be performed.
// Return copy of input.
if (offset == 0) { return std::make_unique<column>(input, stream, mr); }

// Algorithm:
//
// 1. Construct gather_map with the LEAD/LAG offset applied to the indices.
// E.g. A gather_map of:
// {0, 1, 2, 3, ..., N-3, N-2, N-1}
// would select the input column, unchanged.
//
// For LEAD(2), the following gather_map is used:
// {3, 4, 5, 6, ..., N-1, NULL_INDEX, NULL_INDEX}
// where `NULL_INDEX` selects `NULL` for the gather.
//
// Similarly, LAG(2) is implemented using the following gather_map:
// {NULL_INDEX, NULL_INDEX, 0, 1, 2...}
//
// 2. Gather input column based on the gather_map.
// 3. If default outputs are available, scatter contents of default_outputs`
mythrocks marked this conversation as resolved.
Show resolved Hide resolved
// to all positions where nulls where gathered in step 2.
//
// Note: Step 3 can be switched to use `copy_if_else()`, once it supports
// nested types.
mythrocks marked this conversation as resolved.
Show resolved Hide resolved

auto static constexpr size_data_type = data_type{type_to_id<size_type>()};

auto gather_map_column =
make_numeric_column(size_data_type, input.size(), mask_state::UNALLOCATED, stream);
auto gather_map = gather_map_column->mutable_view();

thrust::transform(rmm::exec_policy(stream),
thrust::make_counting_iterator(size_type{0}),
thrust::make_counting_iterator(size_type{input.size()}),
gather_map.begin<size_type>(),
lead_lag_gather_map_builder<op, PrecedingIter, FollowingIter>{
input.size(), offset, preceding, following});

auto output_with_nulls =
cudf::detail::gather(table_view{std::vector<column_view>{input}},
gather_map_column->view().template begin<size_type>(),
gather_map_column->view().end<size_type>(),
out_of_bounds_policy::NULLIFY,
stream,
mr);

if (default_outputs.is_empty()) { return std::move(output_with_nulls->release()[0]); }

// Must scatter defaults.
auto scatter_map = rmm::device_uvector<size_type>(input.size(), stream);

// Find all indices at which LEAD/LAG computed nulls previously.
auto scatter_map_end =
thrust::copy_if(rmm::exec_policy(stream),
thrust::make_counting_iterator(size_type{0}),
thrust::make_counting_iterator(size_type{input.size()}),
scatter_map.begin(),
is_null_index_predicate(input.size(), gather_map.begin<size_type>()));

// Bail early, if all LEAD/LAG computations succeeded. No defaults need be substituted.
if (scatter_map.is_empty()) { return std::move(output_with_nulls->release()[0]); }

// Gather only those default values that are to be substituted.
auto gathered_defaults =
cudf::detail::gather(table_view{std::vector<column_view>{default_outputs}},
scatter_map.begin(),
scatter_map_end,
out_of_bounds_policy::DONT_CHECK,
stream);

// Scatter defaults into locations where LEAD/LAG computed nulls.
auto scattered_results = cudf::detail::scatter(
table_view{std::vector<column_view>{gathered_defaults->release()[0]->view()}},
scatter_map.begin(),
scatter_map_end,
table_view{std::vector<column_view>{output_with_nulls->release()[0]->view()}},
false,
stream,
mr);
return std::move(scattered_results->release()[0]);
mythrocks marked this conversation as resolved.
Show resolved Hide resolved
}

} // namespace cudf::detail
76 changes: 38 additions & 38 deletions cpp/src/rolling/rolling_detail.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,20 @@
#include <cudf/column/column_device_view.cuh>
#include <cudf/column/column_factories.hpp>
#include <cudf/column/column_view.hpp>
#include <cudf/concatenate.hpp>
mythrocks marked this conversation as resolved.
Show resolved Hide resolved
#include <cudf/copying.hpp>
#include <cudf/detail/aggregation/aggregation.cuh>
#include <cudf/detail/aggregation/aggregation.hpp>
#include <cudf/detail/copy.hpp>
#include <cudf/detail/gather.cuh>
mythrocks marked this conversation as resolved.
Show resolved Hide resolved
#include <cudf/detail/gather.hpp>
#include <cudf/detail/get_value.cuh>
#include <cudf/detail/groupby/sort_helper.hpp>
#include <cudf/detail/nvtx/ranges.hpp>
#include <cudf/detail/scatter.cuh>
mythrocks marked this conversation as resolved.
Show resolved Hide resolved
#include <cudf/detail/utilities/cuda.cuh>
#include <cudf/detail/utilities/device_operators.cuh>
#include <cudf/detail/utilities/vector_factories.hpp>
mythrocks marked this conversation as resolved.
Show resolved Hide resolved
#include <cudf/detail/valid_if.cuh>
#include <cudf/dictionary/dictionary_column_view.hpp>
#include <cudf/dictionary/dictionary_factories.hpp>
Expand Down Expand Up @@ -59,6 +63,7 @@
#include <thrust/transform.h>

#include <memory>
#include "lead_lag_nested_detail.cuh"
mythrocks marked this conversation as resolved.
Show resolved Hide resolved

namespace cudf {

Expand Down Expand Up @@ -744,23 +749,14 @@ struct rolling_window_launcher {
std::unique_ptr<column>>
launch(column_view const& input,
column_view const& default_outputs,
PrecedingWindowIterator preceding_window_begin,
FollowingWindowIterator following_window_begin,
PrecedingWindowIterator preceding,
FollowingWindowIterator following,
size_type min_periods,
std::unique_ptr<aggregation> const& agg,
agg_op const& device_agg_op,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
CUDF_EXPECTS(default_outputs.type().id() == input.type().id(),
"Defaults column type must match input column."); // Because LEAD/LAG.

// For LEAD(0)/LAG(0), no computation need be performed.
// Return copy of input.
if (0 == static_cast<cudf::detail::lead_lag_aggregation*>(agg.get())->row_offset) {
return std::make_unique<column>(input, stream, mr);
}

auto output = make_fixed_width_column(
target_type(input.type(), op), input.size(), mask_state::UNINITIALIZED, stream, mr);

Expand All @@ -770,8 +766,8 @@ struct rolling_window_launcher {
input,
default_outputs,
output_view,
preceding_window_begin,
following_window_begin,
preceding,
following,
min_periods,
agg,
device_agg_op,
Expand All @@ -782,30 +778,6 @@ struct rolling_window_launcher {
return output;
}

// Deals with invalid column and/or aggregation options
template <typename T,
typename agg_op,
aggregation::Kind op,
typename PrecedingWindowIterator,
typename FollowingWindowIterator>
std::enable_if_t<!(op == aggregation::LEAD || op == aggregation::LAG) ||
!cudf::is_fixed_width<T>(),
std::unique_ptr<column>>
launch(column_view const& input,
column_view const& default_outputs,
PrecedingWindowIterator preceding_window_begin,
FollowingWindowIterator following_window_begin,
size_type min_periods,
std::unique_ptr<aggregation> const& agg,
agg_op device_agg_op,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
CUDF_FAIL(
"Aggregation operator and/or input type combination is invalid: "
"LEAD/LAG supported only on fixed-width types");
}

template <aggregation::Kind op,
typename PrecedingWindowIterator,
typename FollowingWindowIterator>
Expand Down Expand Up @@ -866,7 +838,9 @@ struct rolling_window_launcher {
template <aggregation::Kind op,
typename PrecedingWindowIterator,
typename FollowingWindowIterator>
std::enable_if_t<(op == aggregation::LEAD || op == aggregation::LAG), std::unique_ptr<column>>
std::enable_if_t<cudf::is_fixed_width<InputType>() &&
(op == aggregation::LEAD || op == aggregation::LAG),
std::unique_ptr<column>>
operator()(column_view const& input,
column_view const& default_outputs,
PrecedingWindowIterator preceding_window_begin,
Expand All @@ -892,6 +866,32 @@ struct rolling_window_launcher {
mr);
}

template <aggregation::Kind op,
typename PrecedingWindowIterator,
typename FollowingWindowIterator>
std::enable_if_t<!cudf::is_fixed_width<InputType>() &&
(op == aggregation::LEAD || op == aggregation::LAG),
std::unique_ptr<column>>
operator()(column_view const& input,
column_view const& default_outputs,
PrecedingWindowIterator preceding_window_begin,
FollowingWindowIterator following_window_begin,
size_type min_periods,
std::unique_ptr<aggregation> const& agg,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
return cudf::detail::
compute_lead_lag_for_nested<op, InputType, PrecedingWindowIterator, FollowingWindowIterator>(
input,
default_outputs,
preceding_window_begin,
following_window_begin,
static_cast<cudf::detail::lead_lag_aggregation*>(agg.get())->row_offset,
stream,
mr);
}

/**
* @brief Creates the offsets child of the result of the `COLLECT_LIST` window aggregation
*
Expand Down
Loading