diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 12353ff9fda..a62641b10c2 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -299,6 +299,7 @@ add_library(cudf src/jit/cache.cpp src/jit/parser.cpp src/jit/type.cpp + src/join/conditional_join.cu src/join/cross_join.cu src/join/hash_join.cu src/join/join.cu diff --git a/cpp/include/cudf/ast/detail/linearizer.hpp b/cpp/include/cudf/ast/detail/linearizer.hpp index 67474e08877..59eda1df7b7 100644 --- a/cpp/include/cudf/ast/detail/linearizer.hpp +++ b/cpp/include/cudf/ast/detail/linearizer.hpp @@ -16,11 +16,9 @@ #pragma once #include -#include #include #include #include -#include namespace cudf { namespace ast { @@ -108,7 +106,7 @@ class linearizer { * @param right The right table used for evaluating the abstract syntax tree. */ linearizer(detail::node const& expr, cudf::table_view left, cudf::table_view right) - : _left(left), _right(right), _node_count(0), _intermediate_counter() + : _left{left}, _right{right}, _node_count{0}, _intermediate_counter{} { expr.accept(*this); } @@ -120,7 +118,7 @@ class linearizer { * @param table The table used for evaluating the abstract syntax tree. */ linearizer(detail::node const& expr, cudf::table_view table) - : _left(table), _right(table), _node_count(0), _intermediate_counter() + : _left{table}, _right{table}, _node_count{0}, _intermediate_counter{} { expr.accept(*this); } diff --git a/cpp/include/cudf/ast/detail/transform.cuh b/cpp/include/cudf/ast/detail/transform.cuh index e56b4fb2281..89fa7d31980 100644 --- a/cpp/include/cudf/ast/detail/transform.cuh +++ b/cpp/include/cudf/ast/detail/transform.cuh @@ -33,9 +33,6 @@ #include -#include -#include - namespace cudf { namespace ast { diff --git a/cpp/include/cudf/join.hpp b/cpp/include/cudf/join.hpp index 725c0fc3699..d0d2083b85b 100644 --- a/cpp/include/cudf/join.hpp +++ b/cpp/include/cudf/join.hpp @@ -658,6 +658,9 @@ class hash_join { * The corresponding values in the second returned vector are * the matched row indices from the right table. * + * If the provided predicate returns NULL for a pair of rows + * (left, right), that pair is not included in the output. + * * @code{.pseudo} * Left: {{0, 1, 2}} * Right: {{1, 2, 3}} @@ -672,6 +675,7 @@ class hash_join { * * @throw cudf::logic_error if number of elements in `left_keys` or `right_keys` * mismatch. + * @throw cudf::logic_error if the binary predicate outputs a non-boolean result. * * @param left The left table * @param right The right table @@ -702,6 +706,9 @@ conditional_inner_join( * from the right table, if there is a match or (2) an unspecified * out-of-bounds value. * + * If the provided predicate returns NULL for a pair of rows + * (left, right), that pair is not included in the output. + * * @code{.pseudo} * Left: {{0, 1, 2}} * Right: {{1, 2, 3}} @@ -716,6 +723,7 @@ conditional_inner_join( * * @throw cudf::logic_error if number of elements in `left_keys` or `right_keys` * mismatch. + * @throw cudf::logic_error if the binary predicate outputs a non-boolean result. * * @param left The left table * @param right The right table @@ -744,6 +752,9 @@ conditional_left_join(table_view left, * right tables, (2) a row index and an unspecified out-of-bounds value, * representing a row from one table without a match in the other. * + * If the provided predicate returns NULL for a pair of rows + * (left, right), that pair is not included in the output. + * * @code{.pseudo} * Left: {{0, 1, 2}} * Right: {{1, 2, 3}} @@ -758,6 +769,7 @@ conditional_left_join(table_view left, * * @throw cudf::logic_error if number of elements in `left_keys` or `right_keys` * mismatch. + * @throw cudf::logic_error if the binary predicate outputs a non-boolean result. * * @param left The left table * @param right The right table @@ -781,6 +793,9 @@ conditional_full_join(table_view left, * for which there exists some row in the right table where the predicate * evaluates to true. * + * If the provided predicate returns NULL for a pair of rows + * (left, right), that pair is not included in the output. + * * @code{.pseudo} * Left: {{0, 1, 2}} * Right: {{1, 2, 3}} @@ -795,6 +810,7 @@ conditional_full_join(table_view left, * * @throw cudf::logic_error if number of elements in `left_keys` or `right_keys` * mismatch. + * @throw cudf::logic_error if the binary predicate outputs a non-boolean result. * * @param left The left table * @param right The right table @@ -818,6 +834,9 @@ std::unique_ptr> conditional_left_semi_join( * for which there does not exist any row in the right table where the * predicate evaluates to true. * + * If the provided predicate returns NULL for a pair of rows + * (left, right), that pair is not included in the output. + * * @code{.pseudo} * Left: {{0, 1, 2}} * Right: {{1, 2, 3}} @@ -832,6 +851,7 @@ std::unique_ptr> conditional_left_semi_join( * * @throw cudf::logic_error if number of elements in `left_keys` or `right_keys` * mismatch. + * @throw cudf::logic_error if the binary predicate outputs a non-boolean result. * * @param left The left table * @param right The right table diff --git a/cpp/include/cudf/table/table_view.hpp b/cpp/include/cudf/table/table_view.hpp index 1ff701c3b01..81d6050b1c6 100644 --- a/cpp/include/cudf/table/table_view.hpp +++ b/cpp/include/cudf/table/table_view.hpp @@ -257,6 +257,11 @@ class mutable_table_view : public detail::table_view_base { mutable_table_view(std::vector const& views); }; +inline bool nullable(table_view const& view) +{ + return std::any_of(view.begin(), view.end(), [](auto const& col) { return col.nullable(); }); +} + inline bool has_nulls(table_view const& view) { return std::any_of(view.begin(), view.end(), [](auto const& col) { return col.has_nulls(); }); diff --git a/cpp/src/ast/transform.cu b/cpp/src/ast/transform.cu index 7aa89635c54..d6426f92002 100644 --- a/cpp/src/ast/transform.cu +++ b/cpp/src/ast/transform.cu @@ -27,18 +27,11 @@ #include #include #include -#include #include #include -#include #include -#include -#include -#include -#include - namespace cudf { namespace ast { namespace detail { @@ -93,11 +86,8 @@ std::unique_ptr compute_column(table_view const table, // If none of the input columns actually contain nulls, we can still use the // non-nullable version of the expression evaluation code path for // performance, so we capture that information as well. - auto const nullable = - std::any_of(table.begin(), table.end(), [](column_view c) { return c.nullable(); }); - auto const has_nulls = nullable && std::any_of(table.begin(), table.end(), [](column_view c) { - return c.nullable() && c.has_nulls(); - }); + auto const nullable = cudf::nullable(table); + auto const has_nulls = nullable && cudf::has_nulls(table); auto const plan = ast_plan{expr, table, has_nulls, stream, mr}; diff --git a/cpp/src/join/conditional_join.cu b/cpp/src/join/conditional_join.cu new file mode 100644 index 00000000000..c7a1630311b --- /dev/null +++ b/cpp/src/join/conditional_join.cu @@ -0,0 +1,130 @@ +/* + * Copyright (c) 2021, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include +#include + +#include +#include +#include + +#include + +namespace cudf { +namespace detail { + +std::pair>, + std::unique_ptr>> +conditional_join(table_view left, + table_view right, + ast::expression binary_predicate, + null_equality compare_nulls, + join_kind JoinKind, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr) +{ + CUDF_FUNC_RANGE(); + return get_conditional_join_indices( + left, right, JoinKind, binary_predicate, compare_nulls, stream, mr); +} + +} // namespace detail + +std::pair>, + std::unique_ptr>> +conditional_inner_join(table_view left, + table_view right, + ast::expression binary_predicate, + null_equality compare_nulls, + rmm::mr::device_memory_resource* mr) +{ + return detail::conditional_join(left, + right, + binary_predicate, + compare_nulls, + detail::join_kind::INNER_JOIN, + rmm::cuda_stream_default, + mr); +} + +std::pair>, + std::unique_ptr>> +conditional_left_join(table_view left, + table_view right, + ast::expression binary_predicate, + null_equality compare_nulls, + rmm::mr::device_memory_resource* mr) +{ + return detail::conditional_join(left, + right, + binary_predicate, + compare_nulls, + detail::join_kind::LEFT_JOIN, + rmm::cuda_stream_default, + mr); +} + +std::pair>, + std::unique_ptr>> +conditional_full_join(table_view left, + table_view right, + ast::expression binary_predicate, + null_equality compare_nulls, + rmm::mr::device_memory_resource* mr) +{ + return detail::conditional_join(left, + right, + binary_predicate, + compare_nulls, + detail::join_kind::FULL_JOIN, + rmm::cuda_stream_default, + mr); +} + +std::unique_ptr> conditional_left_semi_join( + table_view left, + table_view right, + ast::expression binary_predicate, + null_equality compare_nulls, + rmm::mr::device_memory_resource* mr) +{ + return std::move(detail::conditional_join(left, + right, + binary_predicate, + compare_nulls, + detail::join_kind::LEFT_SEMI_JOIN, + rmm::cuda_stream_default, + mr) + .first); +} + +std::unique_ptr> conditional_left_anti_join( + table_view left, + table_view right, + ast::expression binary_predicate, + null_equality compare_nulls, + rmm::mr::device_memory_resource* mr) +{ + return std::move(detail::conditional_join(left, + right, + binary_predicate, + compare_nulls, + detail::join_kind::LEFT_ANTI_JOIN, + rmm::cuda_stream_default, + mr) + .first); +} + +} // namespace cudf diff --git a/cpp/src/join/nested_loop_join.cuh b/cpp/src/join/conditional_join.cuh similarity index 91% rename from cpp/src/join/nested_loop_join.cuh rename to cpp/src/join/conditional_join.cuh index 9848477a894..4602b7fefaa 100644 --- a/cpp/src/join/nested_loop_join.cuh +++ b/cpp/src/join/conditional_join.cuh @@ -15,14 +15,13 @@ */ #pragma once -#include "hash_join.cuh" -#include "join_common_utils.hpp" -#include "join_kernels.cuh" +#include +#include +#include #include #include -#include -#include +#include #include #include #include @@ -31,10 +30,6 @@ #include #include -#include - -#include - namespace cudf { namespace detail { @@ -85,15 +80,8 @@ get_conditional_join_indices(table_view const& left, // If none of the input columns actually contain nulls, we can still use the // non-nullable version of the expression evaluation code path for // performance, so we capture that information as well. - auto const nullable = - std::any_of(left.begin(), left.end(), [](column_view c) { return c.nullable(); }) || - std::any_of(right.begin(), right.end(), [](column_view c) { return c.nullable(); }); - auto const has_nulls = - nullable && - (std::any_of( - left.begin(), left.end(), [](column_view c) { return c.nullable() && c.has_nulls(); }) || - std::any_of( - right.begin(), right.end(), [](column_view c) { return c.nullable() && c.has_nulls(); })); + auto const nullable = cudf::nullable(left) || cudf::nullable(right); + auto const has_nulls = nullable && (cudf::has_nulls(left) || cudf::has_nulls(right)); auto const plan = ast::detail::ast_plan{binary_pred, left, right, has_nulls, stream, mr}; CUDF_EXPECTS(plan.output_type().id() == type_id::BOOL8, diff --git a/cpp/src/join/conditional_join_kernels.cuh b/cpp/src/join/conditional_join_kernels.cuh new file mode 100644 index 00000000000..3d34a49c5af --- /dev/null +++ b/cpp/src/join/conditional_join_kernels.cuh @@ -0,0 +1,241 @@ +/* + * Copyright (c) 2021, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +#include +#include +#include +#include +#include +#include + +namespace cudf { +namespace detail { + +/** + * @brief Computes the output size of joining the left table to the right table. + * + * This method uses a nested loop to iterate over the left and right tables and count the number of + * matches according to a boolean expression. + * + * @tparam block_size The number of threads per block for this kernel + * @tparam has_nulls Whether or not the inputs may contain nulls. + * + * @param[in] left_table The left table + * @param[in] right_table The right table + * @param[in] JoinKind The type of join to be performed + * @param[in] compare_nulls Controls whether null join-key values should match or not. + * @param[in] plan Container of device data required to evaluate the desired expression. + * @param[out] output_size The resulting output size + */ +template +__global__ void compute_conditional_join_output_size(table_device_view left_table, + table_device_view right_table, + join_kind JoinKind, + null_equality compare_nulls, + ast::detail::device_ast_plan plan, + cudf::size_type* output_size) +{ + // The (required) extern storage of the shared memory array leads to + // conflicting declarations between different templates. The easiest + // workaround is to declare an arbitrary (here char) array type then cast it + // after the fact to the appropriate type. + extern __shared__ char raw_intermediate_storage[]; + cudf::ast::detail::IntermediateDataType* intermediate_storage = + reinterpret_cast*>(raw_intermediate_storage); + auto thread_intermediate_storage = &intermediate_storage[threadIdx.x * plan.num_intermediates]; + + cudf::size_type thread_counter(0); + const cudf::size_type left_start_idx = threadIdx.x + blockIdx.x * blockDim.x; + const cudf::size_type left_stride = blockDim.x * gridDim.x; + const cudf::size_type left_num_rows = left_table.num_rows(); + const cudf::size_type right_num_rows = right_table.num_rows(); + + auto evaluator = cudf::ast::detail::expression_evaluator( + left_table, right_table, plan, thread_intermediate_storage, compare_nulls); + + for (cudf::size_type left_row_index = left_start_idx; left_row_index < left_num_rows; + left_row_index += left_stride) { + bool found_match = false; + for (cudf::size_type right_row_index = 0; right_row_index < right_num_rows; right_row_index++) { + auto output_dest = cudf::ast::detail::value_expression_result(); + evaluator.evaluate(output_dest, left_row_index, right_row_index, 0); + if (output_dest.is_valid() && output_dest.value()) { + if ((JoinKind != join_kind::LEFT_ANTI_JOIN) && + !(JoinKind == join_kind::LEFT_SEMI_JOIN && found_match)) { + ++thread_counter; + } + found_match = true; + } + } + if ((JoinKind == join_kind::LEFT_JOIN || JoinKind == join_kind::LEFT_ANTI_JOIN || + JoinKind == join_kind::FULL_JOIN) && + (!found_match)) { + ++thread_counter; + } + } + + using BlockReduce = cub::BlockReduce; + __shared__ typename BlockReduce::TempStorage temp_storage; + cudf::size_type block_counter = BlockReduce(temp_storage).Sum(thread_counter); + + // Add block counter to global counter + if (threadIdx.x == 0) atomicAdd(output_size, block_counter); +} + +/** + * @brief Performs a join conditioned on a predicate to find all matching rows + * between the left and right tables and generate the output for the desired + * Join operation. + * + * @tparam block_size The number of threads per block for this kernel + * @tparam output_cache_size The side of the shared memory buffer to cache join + * output results + * @tparam has_nulls Whether or not the inputs may contain nulls. + * + * @param[in] left_table The left table + * @param[in] right_table The right table + * @param[in] JoinKind The type of join to be performed + * @param compare_nulls Controls whether null join-key values should match or not. + * @param[out] join_output_l The left result of the join operation + * @param[out] join_output_r The right result of the join operation + * @param[in,out] current_idx A global counter used by threads to coordinate + * writes to the global output + * @param plan Container of device data required to evaluate the desired expression. + * @param[in] max_size The maximum size of the output + */ +template +__global__ void conditional_join(table_device_view left_table, + table_device_view right_table, + join_kind JoinKind, + null_equality compare_nulls, + cudf::size_type* join_output_l, + cudf::size_type* join_output_r, + cudf::size_type* current_idx, + cudf::ast::detail::device_ast_plan plan, + const cudf::size_type max_size) +{ + constexpr int num_warps = block_size / detail::warp_size; + __shared__ cudf::size_type current_idx_shared[num_warps]; + __shared__ cudf::size_type join_shared_l[num_warps][output_cache_size]; + __shared__ cudf::size_type join_shared_r[num_warps][output_cache_size]; + + // Normally the casting of a shared memory array is used to create multiple + // arrays of different types from the shared memory buffer, but here it is + // used to circumvent conflicts between arrays of different types between + // different template instantiations due to the extern specifier. + extern __shared__ char raw_intermediate_storage[]; + cudf::ast::detail::IntermediateDataType* intermediate_storage = + reinterpret_cast*>(raw_intermediate_storage); + auto thread_intermediate_storage = &intermediate_storage[threadIdx.x * plan.num_intermediates]; + + const int warp_id = threadIdx.x / detail::warp_size; + const int lane_id = threadIdx.x % detail::warp_size; + const cudf::size_type left_num_rows = left_table.num_rows(); + const cudf::size_type right_num_rows = right_table.num_rows(); + + if (0 == lane_id) { current_idx_shared[warp_id] = 0; } + + __syncwarp(); + + cudf::size_type left_row_index = threadIdx.x + blockIdx.x * blockDim.x; + + const unsigned int activemask = __ballot_sync(0xffffffff, left_row_index < left_num_rows); + + auto evaluator = cudf::ast::detail::expression_evaluator( + left_table, right_table, plan, thread_intermediate_storage, compare_nulls); + + if (left_row_index < left_num_rows) { + bool found_match = false; + for (size_type right_row_index(0); right_row_index < right_num_rows; ++right_row_index) { + auto output_dest = cudf::ast::detail::value_expression_result(); + evaluator.evaluate(output_dest, left_row_index, right_row_index, 0); + + if (output_dest.is_valid() && output_dest.value()) { + // If the rows are equal, then we have found a true match + // In the case of left anti joins we only add indices from left after + // the loop if we have found _no_ matches from the right. + // In the case of left semi joins we only add the first match (note + // that the current logic relies on the fact that we process all right + // table rows for a single left table row on a single thread so that no + // synchronization of found_match is required). + if ((JoinKind != join_kind::LEFT_ANTI_JOIN) && + !(JoinKind == join_kind::LEFT_SEMI_JOIN && found_match)) { + add_pair_to_cache(left_row_index, + right_row_index, + current_idx_shared, + warp_id, + join_shared_l[warp_id], + join_shared_r[warp_id]); + } + found_match = true; + } + + __syncwarp(activemask); + // flush output cache if next iteration does not fit + if (current_idx_shared[warp_id] + detail::warp_size >= output_cache_size) { + flush_output_cache(activemask, + max_size, + warp_id, + lane_id, + current_idx, + current_idx_shared, + join_shared_l, + join_shared_r, + join_output_l, + join_output_r); + __syncwarp(activemask); + if (0 == lane_id) { current_idx_shared[warp_id] = 0; } + __syncwarp(activemask); + } + } + + // Left, left anti, and full joins all require saving left columns that + // aren't present in the right. + if ((JoinKind == join_kind::LEFT_JOIN || JoinKind == join_kind::LEFT_ANTI_JOIN || + JoinKind == join_kind::FULL_JOIN) && + (!found_match)) { + add_pair_to_cache(left_row_index, + static_cast(JoinNoneValue), + current_idx_shared, + warp_id, + join_shared_l[warp_id], + join_shared_r[warp_id]); + } + + // final flush of output cache + if (current_idx_shared[warp_id] > 0) { + flush_output_cache(activemask, + max_size, + warp_id, + lane_id, + current_idx, + current_idx_shared, + join_shared_l, + join_shared_r, + join_output_l, + join_output_r); + } + } +} + +} // namespace detail + +} // namespace cudf diff --git a/cpp/src/join/hash_join.cu b/cpp/src/join/hash_join.cu index e6110edfaa8..50cc479fcf4 100644 --- a/cpp/src/join/hash_join.cu +++ b/cpp/src/join/hash_join.cu @@ -40,121 +40,6 @@ std::pair, std::unique_ptr> get_empty_joined_table return std::make_pair(std::move(empty_probe), std::move(empty_build)); } -VectorPair concatenate_vector_pairs(VectorPair& a, VectorPair& b, rmm::cuda_stream_view stream) -{ - CUDF_EXPECTS((a.first->size() == a.second->size()), - "Mismatch between sizes of vectors in vector pair"); - CUDF_EXPECTS((b.first->size() == b.second->size()), - "Mismatch between sizes of vectors in vector pair"); - if (a.first->is_empty()) { - return std::move(b); - } else if (b.first->is_empty()) { - return std::move(a); - } - auto original_size = a.first->size(); - a.first->resize(a.first->size() + b.first->size(), stream); - a.second->resize(a.second->size() + b.second->size(), stream); - thrust::copy( - rmm::exec_policy(stream), b.first->begin(), b.first->end(), a.first->begin() + original_size); - thrust::copy(rmm::exec_policy(stream), - b.second->begin(), - b.second->end(), - a.second->begin() + original_size); - return std::move(a); -} - -template -struct valid_range { - T start, stop; - __host__ __device__ valid_range(const T begin, const T end) : start(begin), stop(end) {} - - __host__ __device__ __forceinline__ bool operator()(const T index) - { - return ((index >= start) && (index < stop)); - } -}; - -/** - * @brief Creates a table containing the complement of left join indices. - * This table has two columns. The first one is filled with JoinNoneValue(-1) - * and the second one contains values from 0 to right_table_row_count - 1 - * excluding those found in the right_indices column. - * - * @param right_indices Vector of indices - * @param left_table_row_count Number of rows of left table - * @param right_table_row_count Number of rows of right table - * @param stream CUDA stream used for device memory operations and kernel launches. - * @param mr Device memory resource used to allocate the returned vectors. - * - * @return Pair of vectors containing the left join indices complement - */ -std::pair>, - std::unique_ptr>> -get_left_join_indices_complement(std::unique_ptr>& right_indices, - size_type left_table_row_count, - size_type right_table_row_count, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr) -{ - // Get array of indices that do not appear in right_indices - - // Vector allocated for unmatched result - auto right_indices_complement = - std::make_unique>(right_table_row_count, stream); - - // If left table is empty in a full join call then all rows of the right table - // should be represented in the joined indices. This is an optimization since - // if left table is empty and full join is called all the elements in - // right_indices will be JoinNoneValue, i.e. -1. This if path should - // produce exactly the same result as the else path but will be faster. - if (left_table_row_count == 0) { - thrust::sequence(rmm::exec_policy(stream), - right_indices_complement->begin(), - right_indices_complement->end(), - 0); - } else { - // Assume all the indices in invalid_index_map are invalid - auto invalid_index_map = - std::make_unique>(right_table_row_count, stream); - thrust::uninitialized_fill( - rmm::exec_policy(stream), invalid_index_map->begin(), invalid_index_map->end(), int32_t{1}); - - // Functor to check for index validity since left joins can create invalid indices - valid_range valid(0, right_table_row_count); - - // invalid_index_map[index_ptr[i]] = 0 for i = 0 to right_table_row_count - // Thus specifying that those locations are valid - thrust::scatter_if(rmm::exec_policy(stream), - thrust::make_constant_iterator(0), - thrust::make_constant_iterator(0) + right_indices->size(), - right_indices->begin(), // Index locations - right_indices->begin(), // Stencil - Check if index location is valid - invalid_index_map->begin(), // Output indices - valid); // Stencil Predicate - size_type begin_counter = static_cast(0); - size_type end_counter = static_cast(right_table_row_count); - - // Create list of indices that have been marked as invalid - size_type indices_count = thrust::copy_if(rmm::exec_policy(stream), - thrust::make_counting_iterator(begin_counter), - thrust::make_counting_iterator(end_counter), - invalid_index_map->begin(), - right_indices_complement->begin(), - thrust::identity()) - - right_indices_complement->begin(); - right_indices_complement->resize(indices_count, stream); - } - - auto left_invalid_indices = - std::make_unique>(right_indices_complement->size(), stream); - thrust::fill(rmm::exec_policy(stream), - left_invalid_indices->begin(), - left_invalid_indices->end(), - JoinNoneValue); - - return std::make_pair(std::move(left_invalid_indices), std::move(right_indices_complement)); -} - /** * @brief Builds the hash table based on the given `build_table`. * diff --git a/cpp/src/join/hash_join.cuh b/cpp/src/join/hash_join.cuh index 1b4cbf4ba1d..dd21a22803b 100644 --- a/cpp/src/join/hash_join.cuh +++ b/cpp/src/join/hash_join.cuh @@ -18,6 +18,7 @@ #include #include #include +#include #include #include @@ -119,51 +120,12 @@ std::size_t compute_join_output_size(table_device_view build_table, return h_size; } -/** - * @brief Computes the trivial left join operation for the case when the - * right table is empty. In this case all the valid indices of the left table - * are returned with their corresponding right indices being set to - * JoinNoneValue, i.e. -1. - * - * @param left Table of left columns to join - * @param stream CUDA stream used for device memory operations and kernel launches - * @param mr Device memory resource used to allocate the result - * - * @return Join output indices vector pair - */ -inline std::pair>, - std::unique_ptr>> -get_trivial_left_join_indices( - table_view const& left, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()) -{ - auto left_indices = std::make_unique>(left.num_rows(), stream, mr); - thrust::sequence(rmm::exec_policy(stream), left_indices->begin(), left_indices->end(), 0); - auto right_indices = - std::make_unique>(left.num_rows(), stream, mr); - thrust::fill( - rmm::exec_policy(stream), right_indices->begin(), right_indices->end(), JoinNoneValue); - return std::make_pair(std::move(left_indices), std::move(right_indices)); -} - std::pair, std::unique_ptr
> get_empty_joined_table( table_view const& probe, table_view const& build); std::unique_ptr combine_table_pair(std::unique_ptr&& left, std::unique_ptr&& right); -VectorPair concatenate_vector_pairs(VectorPair& a, VectorPair& b, rmm::cuda_stream_view stream); - -std::pair>, - std::unique_ptr>> -get_left_join_indices_complement( - std::unique_ptr>& right_indices, - size_type left_table_row_count, - size_type right_table_row_count, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()); - } // namespace detail struct hash_join::hash_join_impl { diff --git a/cpp/src/join/join.cu b/cpp/src/join/join.cu index cf711524f0b..526edbf6903 100644 --- a/cpp/src/join/join.cu +++ b/cpp/src/join/join.cu @@ -15,12 +15,11 @@ */ #include #include -#include -#include #include #include #include +#include #include @@ -220,21 +219,6 @@ std::unique_ptr
full_join(table_view const& left_input, return combine_table_pair(std::move(left_result), std::move(right_result)); } -std::pair>, - std::unique_ptr>> -conditional_join(table_view left, - table_view right, - ast::expression binary_predicate, - null_equality compare_nulls, - join_kind JoinKind, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr) -{ - CUDF_FUNC_RANGE(); - return get_conditional_join_indices( - left, right, JoinKind, binary_predicate, compare_nulls, stream, mr); -} - } // namespace detail hash_join::~hash_join() = default; @@ -372,88 +356,4 @@ std::unique_ptr
full_join(table_view const& left, left, right, left_on, right_on, compare_nulls, rmm::cuda_stream_default, mr); } -std::pair>, - std::unique_ptr>> -conditional_inner_join(table_view left, - table_view right, - ast::expression binary_predicate, - null_equality compare_nulls, - rmm::mr::device_memory_resource* mr) -{ - return detail::conditional_join(left, - right, - binary_predicate, - compare_nulls, - detail::join_kind::INNER_JOIN, - rmm::cuda_stream_default, - mr); -} - -std::pair>, - std::unique_ptr>> -conditional_left_join(table_view left, - table_view right, - ast::expression binary_predicate, - null_equality compare_nulls, - rmm::mr::device_memory_resource* mr) -{ - return detail::conditional_join(left, - right, - binary_predicate, - compare_nulls, - detail::join_kind::LEFT_JOIN, - rmm::cuda_stream_default, - mr); -} - -std::pair>, - std::unique_ptr>> -conditional_full_join(table_view left, - table_view right, - ast::expression binary_predicate, - null_equality compare_nulls, - rmm::mr::device_memory_resource* mr) -{ - return detail::conditional_join(left, - right, - binary_predicate, - compare_nulls, - detail::join_kind::FULL_JOIN, - rmm::cuda_stream_default, - mr); -} - -std::unique_ptr> conditional_left_semi_join( - table_view left, - table_view right, - ast::expression binary_predicate, - null_equality compare_nulls, - rmm::mr::device_memory_resource* mr) -{ - return std::move(detail::conditional_join(left, - right, - binary_predicate, - compare_nulls, - detail::join_kind::LEFT_SEMI_JOIN, - rmm::cuda_stream_default, - mr) - .first); -} - -std::unique_ptr> conditional_left_anti_join( - table_view left, - table_view right, - ast::expression binary_predicate, - null_equality compare_nulls, - rmm::mr::device_memory_resource* mr) -{ - return std::move(detail::conditional_join(left, - right, - binary_predicate, - compare_nulls, - detail::join_kind::LEFT_ANTI_JOIN, - rmm::cuda_stream_default, - mr) - .first); -} } // namespace cudf diff --git a/cpp/src/join/join_common_utils.cuh b/cpp/src/join/join_common_utils.cuh new file mode 100644 index 00000000000..2b1c870bea1 --- /dev/null +++ b/cpp/src/join/join_common_utils.cuh @@ -0,0 +1,268 @@ +/* + * Copyright (c) 2021, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include + +#include + +#include +#include +#include + +#include +#include + +namespace cudf { +namespace detail { + +/** + * @brief Computes the trivial left join operation for the case when the + * right table is empty. In this case all the valid indices of the left table + * are returned with their corresponding right indices being set to + * JoinNoneValue, i.e. -1. + * + * @param left Table of left columns to join + * @param stream CUDA stream used for device memory operations and kernel launches + * @param mr Device memory resource used to allocate the result + * + * @return Join output indices vector pair + */ +inline std::pair>, + std::unique_ptr>> +get_trivial_left_join_indices( + table_view const& left, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()) +{ + auto left_indices = std::make_unique>(left.num_rows(), stream, mr); + thrust::sequence(rmm::exec_policy(stream), left_indices->begin(), left_indices->end(), 0); + auto right_indices = + std::make_unique>(left.num_rows(), stream, mr); + thrust::uninitialized_fill( + rmm::exec_policy(stream), right_indices->begin(), right_indices->end(), JoinNoneValue); + return std::make_pair(std::move(left_indices), std::move(right_indices)); +} + +// Convenient alias for a pair of unique pointers to device uvectors. +using VectorPair = std::pair>, + std::unique_ptr>>; + +/** + * @brief Takes two pairs of vectors and returns a single pair where the first + * element is a vector made from concatenating the first elements of both input + * pairs and the second element is a vector made from concatenating the second + * elements of both input pairs. + * + * This function's primary use is for computing the indices of a full join by + * first performing a left join, then separately getting the complementary + * right join indices, then finally calling this function to concatenate the + * results. In this case, each input VectorPair contains the left and right + * indices from a join. + * + * Note that this is a destructive operation, in that at least one of a or b + * will be invalidated (by a move) by this operation. Calling code should + * assume that neither input VectorPair is valid after this function executes. + * + * @param a The first pair of vectors. + * @param b The second pair of vectors. + * @param stream CUDA stream used for device memory operations and kernel launches + * + * @return A pair of vectors containing the concatenated output. + */ +inline VectorPair concatenate_vector_pairs(VectorPair& a, + VectorPair& b, + rmm::cuda_stream_view stream) +{ + CUDF_EXPECTS((a.first->size() == a.second->size()), + "Mismatch between sizes of vectors in vector pair"); + CUDF_EXPECTS((b.first->size() == b.second->size()), + "Mismatch between sizes of vectors in vector pair"); + if (a.first->is_empty()) { + return std::move(b); + } else if (b.first->is_empty()) { + return std::move(a); + } + auto original_size = a.first->size(); + a.first->resize(a.first->size() + b.first->size(), stream); + a.second->resize(a.second->size() + b.second->size(), stream); + thrust::copy( + rmm::exec_policy(stream), b.first->begin(), b.first->end(), a.first->begin() + original_size); + thrust::copy(rmm::exec_policy(stream), + b.second->begin(), + b.second->end(), + a.second->begin() + original_size); + return std::move(a); +} + +/** + * @brief Device functor to determine if an index is contained in a range. + */ +template +struct valid_range { + T start, stop; + __host__ __device__ valid_range(const T begin, const T end) : start(begin), stop(end) {} + + __host__ __device__ __forceinline__ bool operator()(const T index) + { + return ((index >= start) && (index < stop)); + } +}; + +/** + * @brief Creates a table containing the complement of left join indices. + * This table has two columns. The first one is filled with JoinNoneValue(-1) + * and the second one contains values from 0 to right_table_row_count - 1 + * excluding those found in the right_indices column. + * + * @param right_indices Vector of indices + * @param left_table_row_count Number of rows of left table + * @param right_table_row_count Number of rows of right table + * @param stream CUDA stream used for device memory operations and kernel launches. + * @param mr Device memory resource used to allocate the returned vectors. + * + * @return Pair of vectors containing the left join indices complement + */ +inline std::pair>, + std::unique_ptr>> +get_left_join_indices_complement(std::unique_ptr>& right_indices, + size_type left_table_row_count, + size_type right_table_row_count, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr) +{ + // Get array of indices that do not appear in right_indices + + // Vector allocated for unmatched result + auto right_indices_complement = + std::make_unique>(right_table_row_count, stream); + + // If left table is empty in a full join call then all rows of the right table + // should be represented in the joined indices. This is an optimization since + // if left table is empty and full join is called all the elements in + // right_indices will be JoinNoneValue, i.e. -1. This if path should + // produce exactly the same result as the else path but will be faster. + if (left_table_row_count == 0) { + thrust::sequence(rmm::exec_policy(stream), + right_indices_complement->begin(), + right_indices_complement->end(), + 0); + } else { + // Assume all the indices in invalid_index_map are invalid + auto invalid_index_map = + std::make_unique>(right_table_row_count, stream); + thrust::uninitialized_fill( + rmm::exec_policy(stream), invalid_index_map->begin(), invalid_index_map->end(), int32_t{1}); + + // Functor to check for index validity since left joins can create invalid indices + valid_range valid(0, right_table_row_count); + + // invalid_index_map[index_ptr[i]] = 0 for i = 0 to right_table_row_count + // Thus specifying that those locations are valid + thrust::scatter_if(rmm::exec_policy(stream), + thrust::make_constant_iterator(0), + thrust::make_constant_iterator(0) + right_indices->size(), + right_indices->begin(), // Index locations + right_indices->begin(), // Stencil - Check if index location is valid + invalid_index_map->begin(), // Output indices + valid); // Stencil Predicate + size_type begin_counter = static_cast(0); + size_type end_counter = static_cast(right_table_row_count); + + // Create list of indices that have been marked as invalid + size_type indices_count = thrust::copy_if(rmm::exec_policy(stream), + thrust::make_counting_iterator(begin_counter), + thrust::make_counting_iterator(end_counter), + invalid_index_map->begin(), + right_indices_complement->begin(), + thrust::identity()) - + right_indices_complement->begin(); + right_indices_complement->resize(indices_count, stream); + } + + auto left_invalid_indices = + std::make_unique>(right_indices_complement->size(), stream); + thrust::uninitialized_fill(rmm::exec_policy(stream), + left_invalid_indices->begin(), + left_invalid_indices->end(), + JoinNoneValue); + + return std::make_pair(std::move(left_invalid_indices), std::move(right_indices_complement)); +} + +/** + * @brief Adds a pair of indices to the shared memory cache + * + * @param[in] first The first index in the pair + * @param[in] second The second index in the pair + * @param[in,out] current_idx_shared Pointer to shared index that determines + * where in the shared memory cache the pair will be written + * @param[in] warp_id The ID of the warp of the calling the thread + * @param[out] joined_shared_l Pointer to the shared memory cache for left indices + * @param[out] joined_shared_r Pointer to the shared memory cache for right indices + */ +__inline__ __device__ void add_pair_to_cache(const size_type first, + const size_type second, + size_type* current_idx_shared, + const int warp_id, + size_type* joined_shared_l, + size_type* joined_shared_r) +{ + size_type my_current_idx{atomicAdd(current_idx_shared + warp_id, size_type(1))}; + + // its guaranteed to fit into the shared cache + joined_shared_l[my_current_idx] = first; + joined_shared_r[my_current_idx] = second; +} + +template +__device__ void flush_output_cache(const unsigned int activemask, + const cudf::size_type max_size, + const int warp_id, + const int lane_id, + cudf::size_type* current_idx, + cudf::size_type current_idx_shared[num_warps], + size_type join_shared_l[num_warps][output_cache_size], + size_type join_shared_r[num_warps][output_cache_size], + size_type* join_output_l, + size_type* join_output_r) +{ + // count how many active threads participating here which could be less than warp_size + int num_threads = __popc(activemask); + cudf::size_type output_offset = 0; + + if (0 == lane_id) { output_offset = atomicAdd(current_idx, current_idx_shared[warp_id]); } + + // No warp sync is necessary here because we are assuming that ShuffleIndex + // is internally using post-CUDA 9.0 synchronization-safe primitives + // (__shfl_sync instead of __shfl). __shfl is technically not guaranteed to + // be safe by the compiler because it is not required by the standard to + // converge divergent branches before executing. + output_offset = cub::ShuffleIndex(output_offset, 0, activemask); + + for (int shared_out_idx = lane_id; shared_out_idx < current_idx_shared[warp_id]; + shared_out_idx += num_threads) { + cudf::size_type thread_offset = output_offset + shared_out_idx; + if (thread_offset < max_size) { + join_output_l[thread_offset] = join_shared_l[warp_id][shared_out_idx]; + join_output_r[thread_offset] = join_shared_r[warp_id][shared_out_idx]; + } + } +} + +} // namespace detail + +} // namespace cudf diff --git a/cpp/src/join/join_common_utils.hpp b/cpp/src/join/join_common_utils.hpp index 9312704f065..d2337e28ed4 100644 --- a/cpp/src/join/join_common_utils.hpp +++ b/cpp/src/join/join_common_utils.hpp @@ -33,9 +33,6 @@ constexpr int DEFAULT_JOIN_BLOCK_SIZE = 128; constexpr int DEFAULT_JOIN_CACHE_SIZE = 128; constexpr size_type JoinNoneValue = std::numeric_limits::min(); -using VectorPair = std::pair>, - std::unique_ptr>>; - using multimap_type = concurrent_unordered_multimap -#include -#include -#include -#include -#include -#include -#include -#include +#include +#include -#include "join_common_utils.hpp" +#include #include +#include namespace cudf { namespace detail { -/** - * @brief Adds a pair of indices to the shared memory cache - * - * @param[in] first The first index in the pair - * @param[in] second The second index in the pair - * @param[in,out] current_idx_shared Pointer to shared index that determines - * where in the shared memory cache the pair will be written - * @param[in] warp_id The ID of the warp of the calling the thread - * @param[out] joined_shared_l Pointer to the shared memory cache for left indices - * @param[out] joined_shared_r Pointer to the shared memory cache for right indices - */ -__inline__ __device__ void add_pair_to_cache(const size_type first, - const size_type second, - size_type* current_idx_shared, - const int warp_id, - size_type* joined_shared_l, - size_type* joined_shared_r) -{ - size_type my_current_idx{atomicAdd(current_idx_shared + warp_id, size_type(1))}; - - // its guaranteed to fit into the shared cache - joined_shared_l[my_current_idx] = first; - joined_shared_r[my_current_idx] = second; -} - /** * @brief Remaps a hash value to a new value if it is equal to the specified sentinel value. * @@ -205,107 +174,6 @@ __global__ void compute_join_output_size(multimap_type multi_map, if (threadIdx.x == 0) atomicAdd(output_size, block_counter); } -/** - * @brief Computes the output size of joining the left table to the right table. - * - * This method uses a nested loop to iterate over the left and right tables and count the number of - * matches according to a boolean expression. - * - * @tparam block_size The number of threads per block for this kernel - * @tparam has_nulls Whether or not the inputs may contain nulls. - * - * @param[in] left_table The left table - * @param[in] right_table The right table - * @param[in] JoinKind The type of join to be performed - * @param[in] compare_nulls Controls whether null join-key values should match or not. - * @param[in] plan Container of device data required to evaluate the desired expression. - * @param[out] output_size The resulting output size - */ -template -__global__ void compute_conditional_join_output_size(table_device_view left_table, - table_device_view right_table, - join_kind JoinKind, - null_equality compare_nulls, - ast::detail::device_ast_plan plan, - cudf::size_type* output_size) -{ - // The (required) extern storage of the shared memory array leads to - // conflicting declarations between different templates. The easiest - // workaround is to declare an arbitrary (here char) array type then cast it - // after the fact to the appropriate type. - extern __shared__ char raw_intermediate_storage[]; - cudf::ast::detail::IntermediateDataType* intermediate_storage = - reinterpret_cast*>(raw_intermediate_storage); - auto thread_intermediate_storage = &intermediate_storage[threadIdx.x * plan.num_intermediates]; - - cudf::size_type thread_counter(0); - const cudf::size_type left_start_idx = threadIdx.x + blockIdx.x * blockDim.x; - const cudf::size_type left_stride = blockDim.x * gridDim.x; - const cudf::size_type left_num_rows = left_table.num_rows(); - const cudf::size_type right_num_rows = right_table.num_rows(); - - auto evaluator = cudf::ast::detail::expression_evaluator( - left_table, right_table, plan, thread_intermediate_storage, compare_nulls); - - for (cudf::size_type left_row_index = left_start_idx; left_row_index < left_num_rows; - left_row_index += left_stride) { - bool found_match = false; - for (cudf::size_type right_row_index = 0; right_row_index < right_num_rows; right_row_index++) { - auto output_dest = cudf::ast::detail::value_expression_result(); - evaluator.evaluate(output_dest, left_row_index, right_row_index, 0); - if (output_dest.is_valid() && output_dest.value()) { - if ((JoinKind != join_kind::LEFT_ANTI_JOIN) && - !(JoinKind == join_kind::LEFT_SEMI_JOIN && found_match)) { - ++thread_counter; - } - found_match = true; - } - } - if ((JoinKind == join_kind::LEFT_JOIN || JoinKind == join_kind::LEFT_ANTI_JOIN || - JoinKind == join_kind::FULL_JOIN) && - (!found_match)) { - ++thread_counter; - } - } - - using BlockReduce = cub::BlockReduce; - __shared__ typename BlockReduce::TempStorage temp_storage; - cudf::size_type block_counter = BlockReduce(temp_storage).Sum(thread_counter); - - // Add block counter to global counter - if (threadIdx.x == 0) atomicAdd(output_size, block_counter); -} - -template -__device__ void flush_output_cache(const unsigned int activemask, - const cudf::size_type max_size, - const int warp_id, - const int lane_id, - cudf::size_type* current_idx, - cudf::size_type current_idx_shared[num_warps], - size_type join_shared_l[num_warps][output_cache_size], - size_type join_shared_r[num_warps][output_cache_size], - size_type* join_output_l, - size_type* join_output_r) -{ - // count how many active threads participating here which could be less than warp_size - int num_threads = __popc(activemask); - cudf::size_type output_offset = 0; - - if (0 == lane_id) { output_offset = atomicAdd(current_idx, current_idx_shared[warp_id]); } - - output_offset = cub::ShuffleIndex(output_offset, 0, activemask); - - for (int shared_out_idx = lane_id; shared_out_idx < current_idx_shared[warp_id]; - shared_out_idx += num_threads) { - cudf::size_type thread_offset = output_offset + shared_out_idx; - if (thread_offset < max_size) { - join_output_l[thread_offset] = join_shared_l[warp_id][shared_out_idx]; - join_output_r[thread_offset] = join_shared_r[warp_id][shared_out_idx]; - } - } -} - /** * @brief Probes the hash map with the probe table to find all matching rows * between the probe and hash table and generate the output for the desired @@ -457,142 +325,6 @@ __global__ void probe_hash_table(multimap_type multi_map, } } -/** - * @brief Performs a join conditioned on a predicate to find all matching rows - * between the left and right tables and generate the output for the desired - * Join operation. - * - * @tparam block_size The number of threads per block for this kernel - * @tparam output_cache_size The side of the shared memory buffer to cache join - * output results - * @tparam has_nulls Whether or not the inputs may contain nulls. - * - * @param[in] left_table The left table - * @param[in] right_table The right table - * @param[in] JoinKind The type of join to be performed - * @param compare_nulls Controls whether null join-key values should match or not. - * @param[out] join_output_l The left result of the join operation - * @param[out] join_output_r The right result of the join operation - * @param[in,out] current_idx A global counter used by threads to coordinate - * writes to the global output - * @param plan Container of device data required to evaluate the desired expression. - * @param[in] max_size The maximum size of the output - */ -template -__global__ void conditional_join(table_device_view left_table, - table_device_view right_table, - join_kind JoinKind, - null_equality compare_nulls, - cudf::size_type* join_output_l, - cudf::size_type* join_output_r, - cudf::size_type* current_idx, - cudf::ast::detail::device_ast_plan plan, - const cudf::size_type max_size) -{ - constexpr int num_warps = block_size / detail::warp_size; - __shared__ cudf::size_type current_idx_shared[num_warps]; - __shared__ cudf::size_type join_shared_l[num_warps][output_cache_size]; - __shared__ cudf::size_type join_shared_r[num_warps][output_cache_size]; - - // Normally the casting of a shared memory array is used to create multiple - // arrays of different types from the shared memory buffer, but here it is - // used to circumvent conflicts between arrays of different types between - // different template instantiations due to the extern specifier. - extern __shared__ char raw_intermediate_storage[]; - cudf::ast::detail::IntermediateDataType* intermediate_storage = - reinterpret_cast*>(raw_intermediate_storage); - auto thread_intermediate_storage = &intermediate_storage[threadIdx.x * plan.num_intermediates]; - - const int warp_id = threadIdx.x / detail::warp_size; - const int lane_id = threadIdx.x % detail::warp_size; - const cudf::size_type left_num_rows = left_table.num_rows(); - const cudf::size_type right_num_rows = right_table.num_rows(); - - if (0 == lane_id) { current_idx_shared[warp_id] = 0; } - - __syncwarp(); - - cudf::size_type left_row_index = threadIdx.x + blockIdx.x * blockDim.x; - - const unsigned int activemask = __ballot_sync(0xffffffff, left_row_index < left_num_rows); - - auto evaluator = cudf::ast::detail::expression_evaluator( - left_table, right_table, plan, thread_intermediate_storage, compare_nulls); - - if (left_row_index < left_num_rows) { - bool found_match = false; - for (size_type right_row_index(0); right_row_index < right_num_rows; ++right_row_index) { - auto output_dest = cudf::ast::detail::value_expression_result(); - evaluator.evaluate(output_dest, left_row_index, right_row_index, 0); - - if (output_dest.is_valid() && output_dest.value()) { - // If the rows are equal, then we have found a true match - // In the case of left anti joins we only add indices from left after - // the loop if we have found _no_ matches from the right. - // In the case of left semi joins we only add the first match (note - // that the current logic relies on the fact that we process all right - // table rows for a single left table row on a single thread so that no - // synchronization of found_match is required). - if ((JoinKind != join_kind::LEFT_ANTI_JOIN) && - !(JoinKind == join_kind::LEFT_SEMI_JOIN && found_match)) { - add_pair_to_cache(left_row_index, - right_row_index, - current_idx_shared, - warp_id, - join_shared_l[warp_id], - join_shared_r[warp_id]); - } - found_match = true; - } - - __syncwarp(activemask); - // flush output cache if next iteration does not fit - if (current_idx_shared[warp_id] + detail::warp_size >= output_cache_size) { - flush_output_cache(activemask, - max_size, - warp_id, - lane_id, - current_idx, - current_idx_shared, - join_shared_l, - join_shared_r, - join_output_l, - join_output_r); - __syncwarp(activemask); - if (0 == lane_id) { current_idx_shared[warp_id] = 0; } - __syncwarp(activemask); - } - } - - // Left, left anti, and full joins all require saving left columns that - // aren't present in the right. - if ((JoinKind == join_kind::LEFT_JOIN || JoinKind == join_kind::LEFT_ANTI_JOIN || - JoinKind == join_kind::FULL_JOIN) && - (!found_match)) { - add_pair_to_cache(left_row_index, - static_cast(JoinNoneValue), - current_idx_shared, - warp_id, - join_shared_l[warp_id], - join_shared_r[warp_id]); - } - - // final flush of output cache - if (current_idx_shared[warp_id] > 0) { - flush_output_cache(activemask, - max_size, - warp_id, - lane_id, - current_idx, - current_idx_shared, - join_shared_l, - join_shared_r, - join_output_l, - join_output_r); - } - } -} - } // namespace detail } // namespace cudf