Skip to content

Commit

Permalink
Join APIs that return gathermaps (#7454)
Browse files Browse the repository at this point in the history
Closes #6480 

# C++ changes

## TL;DR

* Adds join APIs that accept join keys and return gathermaps
* Return type is a `unique_ptr<rmm::device_uvector<size_type>>>` (rather than a `unique_ptr<column>`), to accommodate join results that can be larger than `INT32_MAX` rows
* Simplifies previous join APIs to not accept arguments relating to "common columns" -- instead, those APIs always return all the columns from the LHS/RHS. Users wanting finer control can use the gathermap-based APIs

## The problem

The work in this PR was motivated by the need for simpler join APIs that give the user more flexibility in how they want to construct the result of a join. To explain the current problem, consider the `inner_join` API:

```c++
std::unique_ptr<cudf::table> inner_join(
  cudf::table_view const& left,
  cudf::table_view const& right,
  std::vector<cudf::size_type> const& left_on,
  std::vector<cudf::size_type> const& right_on,
  std::vector<std::pair<cudf::size_type, cudf::size_type>> const& columns_in_common,
  null_equality compare_nulls         = null_equality::EQUAL,
  rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());
```

In addition to the left and right tables (and corresponding keys), the API also accepts a `columns_in_common` argument. This is argument specifies pairs of columns from the LHS and RHS respectively, for which only a single column should appear in the result. That single column appears on the "left" side of the result. This makes the API somewhat complicated as well as inflexible.

There is a "lower-level" join API that gives more control on which side the "common" columns should go, by providing an additional `common_columns_output_side` argument:

```c++
  std::pair<std::unique_ptr<cudf::table>, std::unique_ptr<cudf::table>> inner_join(
    cudf::table_view const& probe,
    std::vector<size_type> const& probe_on,
    std::vector<std::pair<cudf::size_type, cudf::size_type>> const& columns_in_common,
    common_columns_output_side common_columns_output_side = common_columns_output_side::PROBE,
    null_equality compare_nulls                           = null_equality::EQUAL,
    rmm::cuda_stream_view stream                          = rmm::cuda_stream_default,
    rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()) const;
```

But even that offers only limited flexibility: for example, it doesn't allow the user to specify an arbitrary ordering of result columns, or omit columns altogether from the result.

## Proposed API

The proposed API in this PR is:

```c++
std::pair<std::unique_ptr<rmm::device_uvector<size_type>>,
          std::unique_ptr<rmm::device_uvector<size_type>>>
inner_join(cudf::table_view const& left_keys,
           cudf::table_view const& right_keys,
           null_equality compare_nulls         = null_equality::EQUAL,
           rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());
```

Note:

* Rather than requiring the full left and right tables of the join, this API only needs the key columns from the left and right tables.
* Rather than constructing the result of the join, this API returns the gathermaps which can be used to construct it.
* For outer join, non-matches are represented by out-of-bound values in the gathermap. In conjunction with the `out_of_bounds_policy::NULLIFY` argument to `gather`, this will produce nulls in the appropriate locations of the result table.
* The API returns a `std::unique_ptr<rmm::device_uvector>>` rather than just `rmm::device_uvector` because of a Cython limitation that prevents wrapping functions whose return types do not provide a nullary (default) constructor.
* The use of `rmm::device_uvector` allows the API to return results of size > `INT32_MAX`, which can occur easily in outer joins. 


# Python changes

## TL;DR

* Add Cython bindings for the new C++ APIs
* Rework join internals to interface with the new Cython APIs

## Changes/Improvements

### _Indexer

One major change introduced in the join internals is the use of a new type `_Indexer` to represent a key column.

Previously, join keys were represented by a numeric offset. This was for two reasons:

* A join key could be either an index column or a data column, and the only way to refer to it unambiguously was by its offset -- a DataFrame can have an index column and a data column with the same name.
* The C++ API required numeric offsets for the `left_on` and `right_on` arguments

`_Indexer` provides a more convenient way to construct and represent join keys by allowing one to refer unambiguosly to an index or data column of a `Frame`:

```
    # >>> df
    #    a
    # b
    # 4  1
    # 5  2
    # 6  3
    # >>> _Indexer("a", column=True).get(df)  # returns column "a" of df
    # >>> _Indexer("b", index=True).get(df)  # returns index level "b" of df
```

### Casting logic

Some of the casting logic has been simplified since we no longer need to post-process (cast) the result returned by libcudf. Previously, we were accounting for `"right"` joins in our casting functions. But, since a right join is implemented in terms of a left join with the operands reversed, it turns out we never really needed to handle right joins separately. I have removed that and it simplifies casting logic further.

### Others

* Renamed `casting_logic.py` to `_join_helpers.py` and included other join utilities there.
* Added a subclass of `Merge` for handling semi/anti joins
* Added a `assert_join_results_equal` helper to compare join results between Pandas and cuDF. libcudf can return join results with arbitrary row ordering, and we weren't accounting for that in some of our tests previously. I'm a bit surprised we never ran into any test failures :)

Authors:
  - Ashwin Srinath (@shwina)
  - Vyas Ramasubramani (@vyasr)

Approvers:
  - Jake Hemstad (@jrhemstad)
  - Keith Kraus (@kkraus14)
  - Mike Wilson (@hyperbolic2346)
  - @brandon-b-miller
  - Mark Harris (@harrism)

URL: #7454
  • Loading branch information
shwina authored Mar 30, 2021
1 parent 56976fa commit 563edfa
Show file tree
Hide file tree
Showing 29 changed files with 1,998 additions and 3,070 deletions.
8 changes: 2 additions & 6 deletions cpp/benchmarks/join/join_benchmark.cu
Original file line number Diff line number Diff line change
Expand Up @@ -105,12 +105,8 @@ static void BM_join(benchmark::State &state)
for (auto _ : state) {
cuda_event_timer raii(state, true, 0);

auto result = cudf::inner_join(probe_table,
build_table,
columns_to_join,
columns_to_join,
{{0, 0}},
cudf::null_equality::UNEQUAL);
auto result = cudf::inner_join(
probe_table, build_table, columns_to_join, columns_to_join, cudf::null_equality::UNEQUAL);
}
}

Expand Down
440 changes: 252 additions & 188 deletions cpp/include/cudf/join.hpp

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions cpp/include/cudf/table/table_view.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,11 @@ class table_view_base {
*/
size_type num_rows() const noexcept { return _num_rows; }

/**
* @brief Returns true if `num_columns()` returns zero, or false otherwise
*/
size_type is_empty() const noexcept { return num_columns() == 0; }

table_view_base() = default;

~table_view_base() = default;
Expand Down
4 changes: 1 addition & 3 deletions cpp/src/copying/gather.cu
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,7 @@ std::unique_ptr<table> gather(table_view const& source_table,

if (neg_indices == negative_index_policy::ALLOWED) {
cudf::size_type n_rows = source_table.num_rows();
auto idx_converter = [n_rows] __device__(size_type in) {
return ((in % n_rows) + n_rows) % n_rows;
};
auto idx_converter = [n_rows] __device__(size_type in) { return in < 0 ? in + n_rows : in; };
return gather(source_table,
thrust::make_transform_iterator(map_begin, idx_converter),
thrust::make_transform_iterator(map_end, idx_converter),
Expand Down
499 changes: 131 additions & 368 deletions cpp/src/join/hash_join.cu

Large diffs are not rendered by default.

143 changes: 52 additions & 91 deletions cpp/src/join/hash_join.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
*/
#pragma once

#include <cudf/detail/concatenate.cuh>
#include <cudf/detail/gather.cuh>
#include <cudf/detail/gather.hpp>
#include <join/join_common_utils.hpp>
#include <join/join_kernels.cuh>

Expand All @@ -25,7 +28,7 @@
#include <cudf/table/table_view.hpp>

#include <rmm/cuda_stream_view.hpp>
#include <rmm/device_vector.hpp>
#include <rmm/device_uvector.hpp>
#include <rmm/exec_policy.hpp>

#include <thrust/sequence.h>
Expand Down Expand Up @@ -178,19 +181,29 @@ size_type estimate_join_output_size(table_device_view build_table,
*
* @param left Table of left columns to join
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource used to allocate the result
*
* @return Join output indices vector pair
*/
inline std::pair<rmm::device_vector<size_type>, rmm::device_vector<size_type>>
get_trivial_left_join_indices(table_view const& left, rmm::cuda_stream_view stream)
inline std::pair<std::unique_ptr<rmm::device_uvector<size_type>>,
std::unique_ptr<rmm::device_uvector<size_type>>>
get_trivial_left_join_indices(
table_view const& left,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource())
{
rmm::device_vector<size_type> left_indices(left.num_rows());
thrust::sequence(rmm::exec_policy(stream), left_indices.begin(), left_indices.end(), 0);
rmm::device_vector<size_type> right_indices(left.num_rows());
thrust::fill(rmm::exec_policy(stream), right_indices.begin(), right_indices.end(), JoinNoneValue);
auto left_indices = std::make_unique<rmm::device_uvector<size_type>>(left.num_rows(), stream, mr);
thrust::sequence(rmm::exec_policy(stream), left_indices->begin(), left_indices->end(), 0);
auto right_indices =
std::make_unique<rmm::device_uvector<size_type>>(left.num_rows(), stream, mr);
thrust::fill(
rmm::exec_policy(stream), right_indices->begin(), right_indices->end(), JoinNoneValue);
return std::make_pair(std::move(left_indices), std::move(right_indices));
}

std::pair<std::unique_ptr<table>, std::unique_ptr<table>> get_empty_joined_table(
table_view const& probe, table_view const& build);

std::unique_ptr<cudf::table> combine_table_pair(std::unique_ptr<cudf::table>&& left,
std::unique_ptr<cudf::table>&& right);

Expand All @@ -207,106 +220,52 @@ struct hash_join::hash_join_impl {

private:
cudf::table_view _build;
cudf::table_view _build_selected;
std::vector<size_type> _build_on;
std::unique_ptr<cudf::detail::multimap_type, std::function<void(cudf::detail::multimap_type*)>>
_hash_table;

public:
/**
* @brief Constructor that internally builds the hash table based on the given `build` table and
* column indices specified by `build_on` for subsequent probe calls.
* @brief Constructor that internally builds the hash table based on the given `build` table
*
* @throw cudf::logic_error if the number of columns in `build` table is 0.
* @throw cudf::logic_error if the number of rows in `build` table exceeds MAX_JOIN_SIZE.
* @throw std::out_of_range if elements of `build_on` exceed the number of columns in the `build`
* table.
*
* @param build The build table, from which the hash table is built.
* @param build_on The column indices from `build` to join on.
* @param compare_nulls Controls whether null join-key values should match or not.
*/
hash_join_impl(cudf::table_view const& build,
std::vector<size_type> const& build_on,
null_equality compare_nulls,
rmm::cuda_stream_view stream = rmm::cuda_stream_default);

std::pair<std::unique_ptr<cudf::table>, std::unique_ptr<cudf::table>> inner_join(
cudf::table_view const& probe,
std::vector<size_type> const& probe_on,
std::vector<std::pair<cudf::size_type, cudf::size_type>> const& columns_in_common,
common_columns_output_side common_columns_output_side,
null_equality compare_nulls,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr) const;

std::unique_ptr<cudf::table> left_join(
cudf::table_view const& probe,
std::vector<size_type> const& probe_on,
std::vector<std::pair<cudf::size_type, cudf::size_type>> const& columns_in_common,
null_equality compare_nulls,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr) const;

std::unique_ptr<cudf::table> full_join(
cudf::table_view const& probe,
std::vector<size_type> const& probe_on,
std::vector<std::pair<cudf::size_type, cudf::size_type>> const& columns_in_common,
null_equality compare_nulls,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr) const;
std::pair<std::unique_ptr<rmm::device_uvector<size_type>>,
std::unique_ptr<rmm::device_uvector<size_type>>>
inner_join(cudf::table_view const& probe,
null_equality compare_nulls,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr) const;

std::pair<std::unique_ptr<rmm::device_uvector<size_type>>,
std::unique_ptr<rmm::device_uvector<size_type>>>
left_join(cudf::table_view const& probe,
null_equality compare_nulls,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr) const;

std::pair<std::unique_ptr<rmm::device_uvector<size_type>>,
std::unique_ptr<rmm::device_uvector<size_type>>>
full_join(cudf::table_view const& probe,
null_equality compare_nulls,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr) const;

private:
/**
* @brief Performs hash join by probing the columns provided in `probe` as per
* the joining indices given in `probe_on` and returns a (`probe`, `_build`) table pair, which
* contains the probe and build portions of the logical joined table respectively.
*
* @throw cudf::logic_error if `columns_in_common` contains a pair of indices
* (`P`, `B`) where `P` does not exist in `probe_on` or `B` does not exist in
* `_build_on`.
* @throw cudf::logic_error if `columns_in_common` contains a pair of indices
* (`P`, `B`) such that the location of `P` within `probe_on` is not equal to
* the location of `B` within `_build_on`.
* @throw cudf::logic_error if the number of elements in `probe_on` and
* `_build_on` are not equal.
* @throw cudf::logic_error if the number of columns in `probe` is 0.
* @throw cudf::logic_error if the number of rows in `probe` table exceeds MAX_JOIN_SIZE.
* @throw std::out_of_range if elements of `probe_on` exceed the number of columns in the `probe`
* table.
* @throw cudf::logic_error if types do not match between joining columns.
*
* @tparam JoinKind The type of join to be performed.
*
* @param probe The probe table.
* @param probe_on The column's indices from `probe` to join on.
* Column `i` from `probe_on` will be compared against column `i` of `_build_on`.
* @param columns_in_common is a vector of pairs of column indices into
* `probe` and `_build`, respectively, that are "in common". For "common"
* columns, only a single output column will be produced, which is gathered
* from `probe_on` columns. Else, for every column in `probe_on` and `_build_on`,
* an output column will be produced. For each of these pairs (P, B), P
* should exist in `probe_on` and B should exist in `_build_on`.
* @param common_columns_output_side @see cudf::hash_join::common_columns_output_side.
* @param compare_nulls Controls whether null join-key values should match or not.
* @param mr Device memory resource used to allocate the returned table's device memory.
* @param stream CUDA stream used for device memory operations and kernel launches.
*
* @return Table pair of (`probe`, `_build`) of joining both tables on the columns
* specified by `probe_on` and `_build_on`. The resulting table pair will be joined columns of
* (`probe(including common columns)`, `_build(excluding common columns)`) if
* `common_columns_output_side` is `PROBE`, or (`probe(excluding common columns)`,
* `_build(including common columns)`) if `common_columns_output_side` is `BUILD`.
*/
template <cudf::detail::join_kind JoinKind>
std::pair<std::unique_ptr<cudf::table>, std::unique_ptr<cudf::table>> compute_hash_join(
cudf::table_view const& probe,
std::vector<size_type> const& probe_on,
std::vector<std::pair<cudf::size_type, cudf::size_type>> const& columns_in_common,
common_columns_output_side common_columns_output_side,
null_equality compare_nulls,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr) const;
std::pair<std::unique_ptr<rmm::device_uvector<size_type>>,
std::unique_ptr<rmm::device_uvector<size_type>>>
compute_hash_join(cudf::table_view const& probe,
null_equality compare_nulls,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr) const;

/**
* @brief Probes the `_hash_table` built from `_build` for tuples in `probe_table`,
Expand All @@ -320,15 +279,17 @@ struct hash_join::hash_join_impl {
* @param probe_table Table of probe side columns to join.
* @param compare_nulls Controls whether null join-key values should match or not.
* @param stream CUDA stream used for device memory operations and kernel launches.
* @param mr Device memory resource used to allocate the returned vectors.
*
* @return Join output indices vector pair.
*/
template <cudf::detail::join_kind JoinKind>
std::enable_if_t<JoinKind != cudf::detail::join_kind::FULL_JOIN,
std::pair<rmm::device_vector<size_type>, rmm::device_vector<size_type>>>
std::pair<std::unique_ptr<rmm::device_uvector<size_type>>,
std::unique_ptr<rmm::device_uvector<size_type>>>
probe_join_indices(cudf::table_view const& probe,
null_equality compare_nulls,
rmm::cuda_stream_view stream) const;
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr) const;
};

} // namespace cudf
Loading

0 comments on commit 563edfa

Please sign in to comment.