Skip to content

Commit

Permalink
Merge pull request #3448 from trevorsm7/fea-port-scatter-to-tables
Browse files Browse the repository at this point in the history
[REVIEW] Port scatter to tables to libcudf++
  • Loading branch information
trevorsm7 authored Nov 28, 2019
2 parents 265c500 + 55fdd75 commit 409f560
Show file tree
Hide file tree
Showing 8 changed files with 473 additions and 14 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@
- PR #3425 Strings column copy_if_else implementation
- PR #3422 Move utilities to legacy
- PR #3201 Define and implement new datetime_ops APIs
- PR #3448 Port scatter_to_tables to libcudf++
- PR #3458 Update strings sections in the transition guide
- PR #3462 Add `make_empty_column` and update `empty_like`.
- PR #3214 Define and implement new unary operations APIs
Expand Down
37 changes: 37 additions & 0 deletions cpp/include/cudf/copying.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,43 @@ std::unique_ptr<table> scatter(
table_view const& target, bool check_bounds = false,
rmm::mr::device_memory_resource* mr = rmm::mr::get_default_resource());

/**
* @brief Scatters the rows of a table to `n` tables according to a partition map
*
* Copies the rows from the input table to new tables according to the table
* indices given by partition_map. The number of output tables is one more than
* the maximum value in `partition_map`.
*
* Output table `i` in [0, n] is empty if `i` does not appear in partition_map.
* output table will be empty.
*
* @throw cudf::logic_error when partition_map is a non-integer type
* @throw cudf::logic_error when partition_map is larger than input
* @throw cudf::logic_error when partition_map has nulls
*
* Example:
* input: [{10, 12, 14, 16, 18, 20, 22, 24, 26, 28},
* { 1, 2, 3, 4, null, 0, 2, 4, 6, 2}]
* partition_map: {3, 4, 3, 1, 4, 4, 0, 1, 1, 1}
* output: {[{22}, {2}],
* [{16, 24, 26, 28}, {4, 4, 6, 2}],
* [{}, {}],
* [{10, 14}, {1, 3}],
* [{12, 18, 20}, {2, null, 0}]}
*
* @param input Table of rows to be partitioned into a set of tables
* tables according to `partition_map`
* @param partition_map Non-null column of integer values that map
* each row in `input` table into one of the output tables
* @param mr The resource to use for all allocations
*
* @return A vector of tables containing the scattered rows of `input`.
* `table` `i` contains all rows `j` from `input` where `partition_map[j] == i`.
*/
std::vector<std::unique_ptr<table>> scatter_to_tables(
table_view const& input, column_view const& partition_map,
rmm::mr::device_memory_resource* mr = rmm::mr::get_default_resource());

/** ---------------------------------------------------------------------------*
* @brief Indicates when to allocate a mask, based on an existing mask.
* ---------------------------------------------------------------------------**/
Expand Down
12 changes: 0 additions & 12 deletions cpp/include/cudf/detail/copy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,18 +81,6 @@ std::unique_ptr<column> allocate_like(column_view const& input, size_type size,
rmm::mr::get_default_resource(),
cudaStream_t stream = 0);

/**
* @brief Creates a table of empty columns with the same types as the `input_table`
*
* Creates the `cudf::column` objects, but does not allocate any underlying device
* memory for the column's data or bitmask.
*
* @param[in] input_table Immutable view of input table to emulate
* @param[in] stream Optional CUDA stream on which to execute kernels
* @return std::unique_ptr<table> A table of empty columns with the same types as the columns in `input_table`
*/
std::unique_ptr<table> empty_like(table_view const& input_table, cudaStream_t stream = 0);

/**
* @brief Returns a new column, where each element is selected from either @p lhs or
* @p rhs based on the value of the corresponding element in @p boolean_mask
Expand Down
39 changes: 39 additions & 0 deletions cpp/include/cudf/detail/scatter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,45 @@ std::unique_ptr<table> scatter(
rmm::mr::device_memory_resource* mr = rmm::mr::get_default_resource(),
cudaStream_t stream = 0);

/**
* @brief Scatters the rows of a table to `n` tables according to a partition map
*
* Copies the rows from the input table to new tables according to the table
* indices given by partition_map. The number of output tables is one more than
* the maximum value in `partition_map`.
*
* Output table `i` in [0, n] is empty if `i` does not appear in partition_map.
* output table will be empty.
*
* @throw cudf::logic_error when partition_map is a non-integer type
* @throw cudf::logic_error when partition_map is larger than input
* @throw cudf::logic_error when partition_map has nulls
*
* Example:
* input: [{10, 12, 14, 16, 18, 20, 22, 24, 26, 28},
* { 1, 2, 3, 4, null, 0, 2, 4, 6, 2}]
* partition_map: {3, 4, 3, 1, 4, 4, 0, 1, 1, 1}
* output: {[{22}, {2}],
* [{16, 24, 26, 28}, {4, 4, 6, 2}],
* [{}, {}],
* [{10, 14}, {1, 3}],
* [{12, 18, 20}, {2, null, 0}]}
*
* @param input Table of rows to be partitioned into a set of tables
* tables according to `partition_map`
* @param partition_map Non-null column of integer values that map
* each row in `input` table into one of the output tables
* @param mr The resource to use for all allocations
* @param stream The stream to use for CUDA operations
*
* @return A vector of tables containing the scattered rows of `input`.
* `table` `i` contains all rows `j` from `input` where `partition_map[j] == i`.
*/
std::vector<std::unique_ptr<table>> scatter_to_tables(
table_view const& input, column_view const& partition_map,
rmm::mr::device_memory_resource* mr = rmm::mr::get_default_resource(),
cudaStream_t stream = 0);

} // namespace detail
} // namespace experimental
} // namespace cudf
97 changes: 95 additions & 2 deletions cpp/src/copying/scatter.cu
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <cudf/detail/copy.hpp>
#include <cudf/detail/scatter.hpp>
#include <cudf/detail/gather.cuh>
#include <cudf/detail/gather.hpp>
#include <cudf/utilities/traits.hpp>
#include <cudf/column/column_device_view.cuh>
#include <cudf/table/table_device_view.cuh>
Expand Down Expand Up @@ -136,7 +137,8 @@ struct scatter_impl {
auto const begin = -target.num_rows();
auto const end = target.num_rows();
auto bounds = bounds_checker<T>{begin, end};
CUDF_EXPECTS(thrust::all_of(rmm::exec_policy(stream)->on(stream),
CUDF_EXPECTS(scatter_map.size() == thrust::count_if(
rmm::exec_policy(stream)->on(stream),
scatter_map.begin<T>(), scatter_map.end<T>(), bounds),
"Scatter map index out of bounds");
}
Expand Down Expand Up @@ -271,7 +273,8 @@ struct scatter_scalar_impl {
auto const begin = -target.num_rows();
auto const end = target.num_rows();
auto bounds = bounds_checker<T>{begin, end};
CUDF_EXPECTS(thrust::all_of(rmm::exec_policy(stream)->on(stream),
CUDF_EXPECTS(indices.size() == thrust::count_if(
rmm::exec_policy(stream)->on(stream),
indices.begin<T>(), indices.end<T>(), bounds),
"Scatter map index out of bounds");
}
Expand Down Expand Up @@ -306,6 +309,72 @@ struct scatter_scalar_impl {
}
};

struct scatter_to_tables_impl {
template <typename T, std::enable_if_t<std::is_integral<T>::value
and not std::is_same<T, bool8>::value>* = nullptr>
std::vector<std::unique_ptr<table>> operator()(
table_view const& input, column_view const& partition_map,
rmm::mr::device_memory_resource* mr, cudaStream_t stream)
{
// Make a mutable copy of the partition map
auto d_partitions = rmm::device_vector<T>(
partition_map.begin<T>(), partition_map.end<T>());

// Initialize gather maps and offsets to sequence
auto d_gather_maps = rmm::device_vector<size_type>(partition_map.size());
auto d_offsets = rmm::device_vector<size_type>(partition_map.size());
thrust::sequence(rmm::exec_policy(stream)->on(stream),
d_gather_maps.begin(), d_gather_maps.end());
thrust::sequence(rmm::exec_policy(stream)->on(stream),
d_offsets.begin(), d_offsets.end());

// Sort sequence using partition map as key to generate gather maps
thrust::stable_sort_by_key(rmm::exec_policy(stream)->on(stream),
d_partitions.begin(), d_partitions.end(), d_gather_maps.begin());

// Reduce unique partitions to extract gather map offsets from sequence
auto end = thrust::unique_by_key(rmm::exec_policy(stream)->on(stream),
d_partitions.begin(), d_partitions.end(), d_offsets.begin());

// Copy partition indices and gather map offsets to host
auto partitions = thrust::host_vector<T>(d_partitions.begin(), end.first);
auto offsets = thrust::host_vector<size_type>(d_offsets.begin(), end.second);
offsets.push_back(partition_map.size());

CUDF_EXPECTS(partitions.front() >= 0, "Invalid negative partition index");
auto output = std::vector<std::unique_ptr<table>>(partitions.back() + 1);

size_t next_partition = 0;
for (size_t index = 0; index < partitions.size(); ++index) {
auto const partition = static_cast<size_t>(partitions[index]);

// Create empty tables for unused partitions
for (; next_partition < partition; ++next_partition) {
output[next_partition] = empty_like(input);
}

// Gather input rows for the current partition (second dispatch for column types)
auto const data = d_gather_maps.data().get() + offsets[index];
auto const size = offsets[index + 1] - offsets[index];
auto const gather_map = column_view(data_type(INT32), size, data);
output[partition] = gather(input, gather_map, false, false, false, mr, stream);

next_partition = partition + 1;
}

return output;
}

template <typename T, std::enable_if_t<not std::is_integral<T>::value
or std::is_same<T, bool8>::value>* = nullptr>
std::vector<std::unique_ptr<table>> operator()(
table_view const& input, column_view const& partition_map,
rmm::mr::device_memory_resource* mr, cudaStream_t stream)
{
CUDF_FAIL("Partition map column must be an integral, non-boolean type");
}
};

} // namespace

std::unique_ptr<table> scatter(
Expand Down Expand Up @@ -356,6 +425,23 @@ std::unique_ptr<table> scatter(
indices, target, check_bounds, mr, stream);
}

std::vector<std::unique_ptr<table>> scatter_to_tables(
table_view const& input, column_view const& partition_map,
rmm::mr::device_memory_resource* mr,
cudaStream_t stream)
{
CUDF_EXPECTS(partition_map.size() <= input.num_rows(), "scatter map larger than input");
CUDF_EXPECTS(partition_map.has_nulls() == false, "scatter map contains nulls");

if (partition_map.size() == 0 || input.num_rows() == 0) {
return std::vector<std::unique_ptr<table>>{};
}

// First dispatch for scatter index type
return type_dispatcher(partition_map.type(), scatter_to_tables_impl{},
input, partition_map, mr, stream);
}

} // namespace detail

std::unique_ptr<table> scatter(
Expand All @@ -374,5 +460,12 @@ std::unique_ptr<table> scatter(
return detail::scatter(source, indices, target, check_bounds, mr);
}

std::vector<std::unique_ptr<table>> scatter_to_tables(
table_view const& input, column_view const& partition_map,
rmm::mr::device_memory_resource* mr)
{
return detail::scatter_to_tables(input, partition_map, mr);
}

} // namespace experimental
} // namespace cudf
1 change: 1 addition & 0 deletions cpp/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,7 @@ set(COPYING_TEST_SRC
"${CMAKE_CURRENT_SOURCE_DIR}/copying/utility_tests.cu"
"${CMAKE_CURRENT_SOURCE_DIR}/copying/gather_tests.cu"
"${CMAKE_CURRENT_SOURCE_DIR}/copying/scatter_tests.cu"
"${CMAKE_CURRENT_SOURCE_DIR}/copying/scatter_to_tables_tests.cpp"
"${CMAKE_CURRENT_SOURCE_DIR}/copying/copy_range_tests.cu"
"${CMAKE_CURRENT_SOURCE_DIR}/copying/slice_tests.cu"
"${CMAKE_CURRENT_SOURCE_DIR}/copying/split_tests.cu"
Expand Down
Loading

0 comments on commit 409f560

Please sign in to comment.