Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/branch-0.19' into 6873
Browse files Browse the repository at this point in the history
  • Loading branch information
galipremsagar committed Mar 24, 2021
2 parents ce784bb + e73fff0 commit 5072e6b
Show file tree
Hide file tree
Showing 8 changed files with 390 additions and 163 deletions.
77 changes: 42 additions & 35 deletions cpp/src/lists/explode.cu
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ std::unique_ptr<table> explode_outer(table_view const& input_table,
});
thrust::inclusive_scan(rmm::exec_policy(stream),
null_or_empty,
null_or_empty + sliced_child.size(),
null_or_empty + explode_col.size(),
null_or_empty_offset.begin());

auto null_or_empty_count =
Expand All @@ -209,41 +209,48 @@ std::unique_ptr<table> explode_outer(table_view const& input_table,
// offsets + 1 here to skip the 0th offset, which removes a - 1 operation later.
auto offsets_minus_one = thrust::make_transform_iterator(
thrust::next(offsets), [offsets] __device__(auto i) { return (i - offsets[0]) - 1; });

auto fill_gather_maps = [offsets_minus_one,
gather_map_p = gather_map.begin(),
explode_col_gather_map_p = explode_col_gather_map.begin(),
position_array = pos.begin(),
sliced_child_size = sliced_child.size(),
null_or_empty_offset_p = null_or_empty_offset.begin(),
include_position,
offsets,
null_or_empty,
offset_size = explode_col.offsets().size() - 1] __device__(auto idx) {
if (idx < sliced_child_size) {
auto lb_idx =
thrust::distance(offsets_minus_one,
thrust::lower_bound(
thrust::seq, offsets_minus_one, offsets_minus_one + (offset_size), idx));
auto index_to_write = null_or_empty_offset_p[lb_idx] + idx;
gather_map_p[index_to_write] = lb_idx;
explode_col_gather_map_p[index_to_write] = idx;
if (include_position) {
position_array[index_to_write] = idx - (offsets[lb_idx] - offsets[0]);
}
}
if (null_or_empty[idx]) {
auto invalid_index = null_or_empty_offset_p[idx] == 0
? offsets[idx]
: offsets[idx] + null_or_empty_offset_p[idx] - 1;
gather_map_p[invalid_index] = idx;

// negative one to indicate a null value
explode_col_gather_map_p[invalid_index] = -1;
if (include_position) { position_array[invalid_index] = 0; }
}
};

// we need to do this loop at least explode_col times or we may not properly fill in null and
// empty entries.
auto loop_count = std::max(sliced_child.size(), explode_col.size());

// Fill in gather map with all the child column's entries
thrust::for_each(rmm::exec_policy(stream),
counting_iter,
counting_iter + sliced_child.size(),
[offsets_minus_one,
gather_map = gather_map.begin(),
explode_col_gather_map = explode_col_gather_map.begin(),
position_array = pos.begin(),
include_position,
offsets,
null_or_empty_offset = null_or_empty_offset.begin(),
null_or_empty,
offset_size = explode_col.offsets().size() - 1] __device__(auto idx) {
auto lb_idx = thrust::distance(
offsets_minus_one,
thrust::lower_bound(
thrust::seq, offsets_minus_one, offsets_minus_one + (offset_size), idx));
auto index_to_write = null_or_empty_offset[lb_idx] + idx;
gather_map[index_to_write] = lb_idx;
explode_col_gather_map[index_to_write] = idx;
if (include_position) {
position_array[index_to_write] = idx - (offsets[lb_idx] - offsets[0]);
}
if (null_or_empty[idx]) {
auto invalid_index = null_or_empty_offset[idx] == 0
? offsets[idx]
: offsets[idx] + null_or_empty_offset[idx] - 1;
gather_map[invalid_index] = idx;

// negative one to indicate a null value
explode_col_gather_map[invalid_index] = -1;

if (include_position) { position_array[invalid_index] = 0; }
}
});
thrust::for_each(
rmm::exec_policy(stream), counting_iter, counting_iter + loop_count, fill_gather_maps);

return build_table(
input_table,
Expand Down
Loading

0 comments on commit 5072e6b

Please sign in to comment.