Skip to content

Commit

Permalink
JSON tree algorithms refactor I: CSR data structure for column tree (#…
Browse files Browse the repository at this point in the history
…15979)

Part of #15903.
1. Introduces the Compressed Sparse Row (CSR) format to store the adjacency information of the column tree. 
2. Analogous to `reduce_to_column_tree`, `reduce_to_column_tree_csr` reduces node tree representation to column tree stored in CSR format.

TODO:
- [x] Correctness test

Authors:
  - Shruti Shivakumar (https://github.com/shrshi)
  - Vukasin Milovanovic (https://github.com/vuule)
  - GALI PREM SAGAR (https://github.com/galipremsagar)

Approvers:
  - Robert (Bobby) Evans (https://github.com/revans2)
  - Vukasin Milovanovic (https://github.com/vuule)
  - Nghia Truong (https://github.com/ttnghia)
  - Karthikeyan (https://github.com/karthikeyann)
  - Kyle Edwards (https://github.com/KyleFromNVIDIA)

URL: #15979
  • Loading branch information
shrshi authored Sep 25, 2024
1 parent f7c5d32 commit 987fea3
Show file tree
Hide file tree
Showing 6 changed files with 758 additions and 28 deletions.
1 change: 1 addition & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,7 @@ add_library(
src/io/functions.cpp
src/io/json/host_tree_algorithms.cu
src/io/json/json_column.cu
src/io/json/column_tree_construction.cu
src/io/json/json_normalization.cu
src/io/json/json_tree.cu
src/io/json/nested_json_gpu.cu
Expand Down
304 changes: 304 additions & 0 deletions cpp/src/io/json/column_tree_construction.cu
Original file line number Diff line number Diff line change
@@ -0,0 +1,304 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "nested_json.hpp"

#include <cudf/detail/nvtx/ranges.hpp>
#include <cudf/detail/utilities/vector_factories.hpp>
#include <cudf/types.hpp>
#include <cudf/utilities/error.hpp>
#include <cudf/utilities/memory_resource.hpp>
#include <cudf/utilities/span.hpp>

#include <rmm/cuda_stream_view.hpp>
#include <rmm/device_uvector.hpp>
#include <rmm/exec_policy.hpp>

#include <cuda/functional>
#include <thrust/for_each.h>
#include <thrust/iterator/constant_iterator.h>
#include <thrust/iterator/counting_iterator.h>
#include <thrust/iterator/permutation_iterator.h>
#include <thrust/iterator/transform_output_iterator.h>
#include <thrust/iterator/zip_iterator.h>
#include <thrust/reduce.h>
#include <thrust/scan.h>
#include <thrust/sort.h>
#include <thrust/transform.h>
#include <thrust/transform_scan.h>
#include <thrust/unique.h>

namespace cudf::io::json {

using row_offset_t = size_type;

#ifdef CSR_DEBUG_PRINT
template <typename T>
void print(device_span<T const> d_vec, std::string name, rmm::cuda_stream_view stream)
{
stream.synchronize();
auto h_vec = cudf::detail::make_std_vector_sync(d_vec, stream);
std::cout << name << " = ";
for (auto e : h_vec) {
std::cout << e << " ";
}
std::cout << std::endl;
}
#endif

namespace experimental::detail {

struct level_ordering {
device_span<TreeDepthT const> node_levels;
device_span<NodeIndexT const> col_ids;
device_span<NodeIndexT const> parent_node_ids;
__device__ bool operator()(NodeIndexT lhs_node_id, NodeIndexT rhs_node_id) const
{
auto lhs_parent_col_id = parent_node_ids[lhs_node_id] == parent_node_sentinel
? parent_node_sentinel
: col_ids[parent_node_ids[lhs_node_id]];
auto rhs_parent_col_id = parent_node_ids[rhs_node_id] == parent_node_sentinel
? parent_node_sentinel
: col_ids[parent_node_ids[rhs_node_id]];

return (node_levels[lhs_node_id] < node_levels[rhs_node_id]) ||
(node_levels[lhs_node_id] == node_levels[rhs_node_id] &&
lhs_parent_col_id < rhs_parent_col_id) ||
(node_levels[lhs_node_id] == node_levels[rhs_node_id] &&
lhs_parent_col_id == rhs_parent_col_id && col_ids[lhs_node_id] < col_ids[rhs_node_id]);
}
};

struct parent_nodeids_to_colids {
device_span<NodeIndexT const> rev_mapped_col_ids;
__device__ auto operator()(NodeIndexT parent_node_id) -> NodeIndexT
{
return parent_node_id == parent_node_sentinel ? parent_node_sentinel
: rev_mapped_col_ids[parent_node_id];
}
};

/**
* @brief Reduces node tree representation to column tree CSR representation.
*
* @param node_tree Node tree representation of JSON string
* @param original_col_ids Column ids of nodes
* @param row_offsets Row offsets of nodes
* @param is_array_of_arrays Whether the tree is an array of arrays
* @param row_array_parent_col_id Column id of row array, if is_array_of_arrays is true
* @param stream CUDA stream used for device memory operations and kernel launches
* @return A tuple of column tree representation of JSON string, column ids of columns, and
* max row offsets of columns
*/
std::tuple<compressed_sparse_row, column_tree_properties> reduce_to_column_tree(
tree_meta_t& node_tree,
device_span<NodeIndexT const> original_col_ids,
device_span<NodeIndexT const> sorted_col_ids,
device_span<NodeIndexT const> ordered_node_ids,
device_span<row_offset_t const> row_offsets,
bool is_array_of_arrays,
NodeIndexT row_array_parent_col_id,
rmm::cuda_stream_view stream)
{
CUDF_FUNC_RANGE();

if (original_col_ids.empty()) {
rmm::device_uvector<NodeIndexT> empty_row_idx(0, stream);
rmm::device_uvector<NodeIndexT> empty_col_idx(0, stream);
rmm::device_uvector<NodeT> empty_column_categories(0, stream);
rmm::device_uvector<row_offset_t> empty_max_row_offsets(0, stream);
rmm::device_uvector<NodeIndexT> empty_mapped_col_ids(0, stream);
return std::tuple{compressed_sparse_row{std::move(empty_row_idx), std::move(empty_col_idx)},
column_tree_properties{std::move(empty_column_categories),
std::move(empty_max_row_offsets),
std::move(empty_mapped_col_ids)}};
}

auto [unpermuted_tree, unpermuted_col_ids, unpermuted_max_row_offsets] =
cudf::io::json::detail::reduce_to_column_tree(node_tree,
original_col_ids,
sorted_col_ids,
ordered_node_ids,
row_offsets,
is_array_of_arrays,
row_array_parent_col_id,
stream);

NodeIndexT num_columns = unpermuted_col_ids.size();

auto mapped_col_ids = cudf::detail::make_device_uvector_async(
unpermuted_col_ids, stream, cudf::get_current_device_resource_ref());
rmm::device_uvector<NodeIndexT> rev_mapped_col_ids(num_columns, stream);
rmm::device_uvector<NodeIndexT> reordering_index(unpermuted_col_ids.size(), stream);

thrust::sequence(
rmm::exec_policy_nosync(stream), reordering_index.begin(), reordering_index.end());
// Reorder nodes and column ids in level-wise fashion
thrust::sort_by_key(
rmm::exec_policy_nosync(stream),
reordering_index.begin(),
reordering_index.end(),
mapped_col_ids.begin(),
level_ordering{
unpermuted_tree.node_levels, unpermuted_col_ids, unpermuted_tree.parent_node_ids});

{
auto mapped_col_ids_copy = cudf::detail::make_device_uvector_async(
mapped_col_ids, stream, cudf::get_current_device_resource_ref());
thrust::sequence(
rmm::exec_policy_nosync(stream), rev_mapped_col_ids.begin(), rev_mapped_col_ids.end());
thrust::sort_by_key(rmm::exec_policy_nosync(stream),
mapped_col_ids_copy.begin(),
mapped_col_ids_copy.end(),
rev_mapped_col_ids.begin());
}

rmm::device_uvector<NodeIndexT> parent_col_ids(num_columns, stream);
thrust::transform_output_iterator parent_col_ids_it(parent_col_ids.begin(),
parent_nodeids_to_colids{rev_mapped_col_ids});
rmm::device_uvector<row_offset_t> max_row_offsets(num_columns, stream);
rmm::device_uvector<NodeT> column_categories(num_columns, stream);
thrust::copy_n(
rmm::exec_policy_nosync(stream),
thrust::make_zip_iterator(thrust::make_permutation_iterator(
unpermuted_tree.parent_node_ids.begin(), reordering_index.begin()),
thrust::make_permutation_iterator(unpermuted_max_row_offsets.begin(),
reordering_index.begin()),
thrust::make_permutation_iterator(
unpermuted_tree.node_categories.begin(), reordering_index.begin())),
num_columns,
thrust::make_zip_iterator(
parent_col_ids_it, max_row_offsets.begin(), column_categories.begin()));

#ifdef CSR_DEBUG_PRINT
print<NodeIndexT>(reordering_index, "h_reordering_index", stream);
print<NodeIndexT>(mapped_col_ids, "h_mapped_col_ids", stream);
print<NodeIndexT>(rev_mapped_col_ids, "h_rev_mapped_col_ids", stream);
print<NodeIndexT>(parent_col_ids, "h_parent_col_ids", stream);
print<row_offset_t>(max_row_offsets, "h_max_row_offsets", stream);
#endif

auto construct_row_idx = [&stream](NodeIndexT num_columns,
device_span<NodeIndexT const> parent_col_ids) {
auto row_idx = cudf::detail::make_zeroed_device_uvector_async<NodeIndexT>(
static_cast<std::size_t>(num_columns + 1), stream, cudf::get_current_device_resource_ref());
// Note that the first element of csr_parent_col_ids is -1 (parent_node_sentinel)
// children adjacency

auto num_non_leaf_columns = thrust::unique_count(
rmm::exec_policy_nosync(stream), parent_col_ids.begin() + 1, parent_col_ids.end());
rmm::device_uvector<NodeIndexT> non_leaf_nodes(num_non_leaf_columns, stream);
rmm::device_uvector<NodeIndexT> non_leaf_nodes_children(num_non_leaf_columns, stream);
thrust::reduce_by_key(rmm::exec_policy_nosync(stream),
parent_col_ids.begin() + 1,
parent_col_ids.end(),
thrust::make_constant_iterator(1),
non_leaf_nodes.begin(),
non_leaf_nodes_children.begin(),
thrust::equal_to<TreeDepthT>());

thrust::scatter(rmm::exec_policy_nosync(stream),
non_leaf_nodes_children.begin(),
non_leaf_nodes_children.end(),
non_leaf_nodes.begin(),
row_idx.begin() + 1);

if (num_columns > 1) {
thrust::transform_inclusive_scan(
rmm::exec_policy_nosync(stream),
thrust::make_zip_iterator(thrust::make_counting_iterator(1), row_idx.begin() + 1),
thrust::make_zip_iterator(thrust::make_counting_iterator(1) + num_columns, row_idx.end()),
row_idx.begin() + 1,
cuda::proclaim_return_type<NodeIndexT>([] __device__(auto a) {
auto n = thrust::get<0>(a);
auto idx = thrust::get<1>(a);
return n == 1 ? idx : idx + 1;
}),
thrust::plus<NodeIndexT>{});
} else {
auto single_node = 1;
row_idx.set_element_async(1, single_node, stream);
}

#ifdef CSR_DEBUG_PRINT
print<NodeIndexT>(row_idx, "h_row_idx", stream);
#endif
return row_idx;
};

auto construct_col_idx = [&stream](NodeIndexT num_columns,
device_span<NodeIndexT const> parent_col_ids,
device_span<NodeIndexT const> row_idx) {
rmm::device_uvector<NodeIndexT> col_idx((num_columns - 1) * 2, stream);
thrust::fill(rmm::exec_policy_nosync(stream), col_idx.begin(), col_idx.end(), -1);
// excluding root node, construct scatter map
rmm::device_uvector<NodeIndexT> map(num_columns - 1, stream);
thrust::inclusive_scan_by_key(rmm::exec_policy_nosync(stream),
parent_col_ids.begin() + 1,
parent_col_ids.end(),
thrust::make_constant_iterator(1),
map.begin());
thrust::for_each_n(rmm::exec_policy_nosync(stream),
thrust::make_counting_iterator(1),
num_columns - 1,
[row_idx = row_idx.begin(),
map = map.begin(),
parent_col_ids = parent_col_ids.begin()] __device__(auto i) {
auto parent_col_id = parent_col_ids[i];
if (parent_col_id == 0)
--map[i - 1];
else
map[i - 1] += row_idx[parent_col_id];
});
thrust::scatter(rmm::exec_policy_nosync(stream),
thrust::make_counting_iterator(1),
thrust::make_counting_iterator(1) + num_columns - 1,
map.begin(),
col_idx.begin());

// Skip the parent of root node
thrust::scatter(rmm::exec_policy_nosync(stream),
parent_col_ids.begin() + 1,
parent_col_ids.end(),
row_idx.begin() + 1,
col_idx.begin());

#ifdef CSR_DEBUG_PRINT
print<NodeIndexT>(col_idx, "h_col_idx", stream);
#endif

return col_idx;
};

/*
5. CSR construction:
a. Sort column levels and get their ordering
b. For each column node coln iterated according to sorted_column_levels; do
i. Find nodes that have coln as the parent node -> set adj_coln
ii. row idx[coln] = size of adj_coln + 1
iii. col idx[coln] = adj_coln U {parent_col_id[coln]}
*/
auto row_idx = construct_row_idx(num_columns, parent_col_ids);
auto col_idx = construct_col_idx(num_columns, parent_col_ids, row_idx);

return std::tuple{
compressed_sparse_row{std::move(row_idx), std::move(col_idx)},
column_tree_properties{
std::move(column_categories), std::move(max_row_offsets), std::move(mapped_col_ids)}};
}

} // namespace experimental::detail
} // namespace cudf::io::json
Loading

0 comments on commit 987fea3

Please sign in to comment.