Skip to content

Commit

Permalink
Add JSON option to prune columns (#14996)
Browse files Browse the repository at this point in the history
Resolves #14951
This adds an option `prune_columns` to json_reader_options (default False)
When set to True, the dtypes option is used as filter instead of type inference suggestion. If dtypes (vector of dtypes, map of dtypes or nested schema), is not specified, output is empty dataframe.

Authors:
  - Karthikeyan (https://github.com/karthikeyann)

Approvers:
  - Robert (Bobby) Evans (https://github.com/revans2)
  - MithunR (https://github.com/mythrocks)
  - Mike Wilson (https://github.com/hyperbolic2346)
  - Shruti Shivakumar (https://github.com/shrshi)
  - Lawrence Mitchell (https://github.com/wence-)

URL: #14996
  • Loading branch information
karthikeyann authored May 2, 2024
1 parent 2ee0219 commit 2fccbc0
Show file tree
Hide file tree
Showing 9 changed files with 377 additions and 55 deletions.
40 changes: 40 additions & 0 deletions cpp/include/cudf/io/json.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ class json_reader_options {
bool _lines = false;
// Parse mixed types as a string column
bool _mixed_types_as_string = false;
// Prune columns on read, selected based on the _dtypes option
bool _prune_columns = false;

// Bytes to skip from the start
size_t _byte_range_offset = 0;
Expand Down Expand Up @@ -241,6 +243,17 @@ class json_reader_options {
*/
bool is_enabled_mixed_types_as_string() const { return _mixed_types_as_string; }

/**
* @brief Whether to prune columns on read, selected based on the @ref set_dtypes option.
*
* When set as true, if the reader options include @ref set_dtypes, then
* the reader will only return those columns which are mentioned in @ref set_dtypes.
* If false, then all columns are returned, independent of the @ref set_dtypes setting.
*
* @return True if column pruning is enabled
*/
bool is_enabled_prune_columns() const { return _prune_columns; }

/**
* @brief Whether to parse dates as DD/MM versus MM/DD.
*
Expand Down Expand Up @@ -342,6 +355,17 @@ class json_reader_options {
*/
void enable_mixed_types_as_string(bool val) { _mixed_types_as_string = val; }

/**
* @brief Set whether to prune columns on read, selected based on the @ref set_dtypes option.
*
* When set as true, if the reader options include @ref set_dtypes, then
* the reader will only return those columns which are mentioned in @ref set_dtypes.
* If false, then all columns are returned, independent of the @ref set_dtypes setting.
*
* @param val Boolean value to enable/disable column pruning
*/
void enable_prune_columns(bool val) { _prune_columns = val; }

/**
* @brief Set whether to parse dates as DD/MM versus MM/DD.
*
Expand Down Expand Up @@ -508,6 +532,22 @@ class json_reader_options_builder {
return *this;
}

/**
* @brief Set whether to prune columns on read, selected based on the @ref dtypes option.
*
* When set as true, if the reader options include @ref dtypes, then
* the reader will only return those columns which are mentioned in @ref dtypes.
* If false, then all columns are returned, independent of the @ref dtypes setting.
*
* @param val Boolean value to enable/disable column pruning
* @return this for chaining
*/
json_reader_options_builder& prune_columns(bool val)
{
options._prune_columns = val;
return *this;
}

/**
* @brief Set whether to parse dates as DD/MM versus MM/DD.
*
Expand Down
143 changes: 98 additions & 45 deletions cpp/src/io/json/json_column.cu
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,7 @@ void make_device_json_column(device_span<SymbolT const> input,
}
};
auto init_to_zero = [stream](auto& v) {
thrust::uninitialized_fill(rmm::exec_policy(stream), v.begin(), v.end(), 0);
thrust::uninitialized_fill(rmm::exec_policy_nosync(stream), v.begin(), v.end(), 0);
};

auto initialize_json_columns = [&](auto i, auto& col) {
Expand Down Expand Up @@ -625,13 +625,14 @@ void make_device_json_column(device_span<SymbolT const> input,
// find column_ids which are values, but should be ignored in validity
std::vector<uint8_t> ignore_vals(num_columns, 0);
std::vector<uint8_t> is_mixed_type_column(num_columns, 0);
std::vector<uint8_t> is_pruned(num_columns, 0);
columns.try_emplace(parent_node_sentinel, std::ref(root));

for (auto const this_col_id : unique_col_ids) {
if (column_categories[this_col_id] == NC_ERR || column_categories[this_col_id] == NC_FN) {
continue;
}
// Struct, List, String, Value
auto name_and_parent_index = [&is_array_of_arrays,
&row_array_parent_col_id,
&column_parent_ids,
&column_categories,
&column_names](auto this_col_id) {
std::string name = "";
auto parent_col_id = column_parent_ids[this_col_id];
if (parent_col_id == parent_node_sentinel || column_categories[parent_col_id] == NC_LIST) {
Expand All @@ -647,11 +648,46 @@ void make_device_json_column(device_span<SymbolT const> input,
} else {
CUDF_FAIL("Unexpected parent column category");
}
return std::pair{name, parent_col_id};
};

// Prune columns that are not required to be parsed.
if (options.is_enabled_prune_columns()) {
for (auto const this_col_id : unique_col_ids) {
if (column_categories[this_col_id] == NC_ERR || column_categories[this_col_id] == NC_FN) {
continue;
}
// Struct, List, String, Value
auto [name, parent_col_id] = name_and_parent_index(this_col_id);
// get path of this column, and get its dtype if present in options
auto const nt = tree_path.get_path(this_col_id);
std::optional<data_type> const user_dtype = get_path_data_type(nt, options);
if (!user_dtype.has_value() and parent_col_id != parent_node_sentinel) {
is_pruned[this_col_id] = 1;
continue;
} else {
// make sure all its parents are not pruned.
while (parent_col_id != parent_node_sentinel and is_pruned[parent_col_id] == 1) {
is_pruned[parent_col_id] = 0;
parent_col_id = column_parent_ids[parent_col_id];
}
}
}
}

// Build the column tree, also, handles mixed types.
for (auto const this_col_id : unique_col_ids) {
if (column_categories[this_col_id] == NC_ERR || column_categories[this_col_id] == NC_FN) {
continue;
}
// Struct, List, String, Value
auto [name, parent_col_id] = name_and_parent_index(this_col_id);

if (parent_col_id != parent_node_sentinel && is_mixed_type_column[parent_col_id] == 1) {
// if parent is mixed type column, ignore this column.
is_mixed_type_column[this_col_id] = 1;
ignore_vals[this_col_id] = 1;
// if parent is mixed type column or this column is pruned, ignore this column.
if (parent_col_id != parent_node_sentinel &&
(is_mixed_type_column[parent_col_id] || is_pruned[this_col_id])) {
ignore_vals[this_col_id] = 1;
if (is_mixed_type_column[parent_col_id]) { is_mixed_type_column[this_col_id] = 1; }
continue;
}

Expand Down Expand Up @@ -714,12 +750,13 @@ void make_device_json_column(device_span<SymbolT const> input,
"A mix of lists and structs within the same column is not supported");
}
}

if (is_enabled_mixed_types_as_string) {
// get path of this column, check if it is a struct forced as string, and enforce it
auto nt = tree_path.get_path(this_col_id);
std::optional<data_type> user_dt = get_path_data_type(nt, options);
if (column_categories[this_col_id] == NC_STRUCT and user_dt.has_value() and
user_dt.value().id() == type_id::STRING) {
auto const nt = tree_path.get_path(this_col_id);
std::optional<data_type> const user_dtype = get_path_data_type(nt, options);
if (column_categories[this_col_id] == NC_STRUCT and user_dtype.has_value() and
user_dtype.value().id() == type_id::STRING) {
is_mixed_type_column[this_col_id] = 1;
column_categories[this_col_id] = NC_STR;
}
Expand Down Expand Up @@ -873,25 +910,27 @@ void make_device_json_column(device_span<SymbolT const> input,
for (auto& [id, col_ref] : columns) {
auto& col = col_ref.get();
if (col.type == json_col_t::StringColumn) {
thrust::inclusive_scan(rmm::exec_policy(stream),
thrust::inclusive_scan(rmm::exec_policy_nosync(stream),
col.string_offsets.begin(),
col.string_offsets.end(),
col.string_offsets.begin(),
thrust::maximum<json_column::row_offset_t>{});
} else if (col.type == json_col_t::ListColumn) {
thrust::inclusive_scan(rmm::exec_policy(stream),
thrust::inclusive_scan(rmm::exec_policy_nosync(stream),
col.child_offsets.begin(),
col.child_offsets.end(),
col.child_offsets.begin(),
thrust::maximum<json_column::row_offset_t>{});
}
}
stream.synchronize();
}

std::pair<std::unique_ptr<column>, std::vector<column_name_info>> device_json_column_to_cudf_column(
device_json_column& json_col,
device_span<SymbolT const> d_input,
cudf::io::parse_options const& options,
bool prune_columns,
std::optional<schema_element> schema,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
Expand Down Expand Up @@ -982,13 +1021,16 @@ std::pair<std::unique_ptr<column>, std::vector<column_name_info>> device_json_co
for (auto const& col_name : json_col.column_order) {
auto const& col = json_col.child_columns.find(col_name);
column_names.emplace_back(col->first);
auto& child_col = col->second;
auto [child_column, names] = device_json_column_to_cudf_column(
child_col, d_input, options, get_child_schema(col_name), stream, mr);
CUDF_EXPECTS(num_rows == child_column->size(),
"All children columns must have the same size");
child_columns.push_back(std::move(child_column));
column_names.back().children = names;
auto& child_col = col->second;
auto child_schema_element = get_child_schema(col_name);
if (!prune_columns or child_schema_element.has_value()) {
auto [child_column, names] = device_json_column_to_cudf_column(
child_col, d_input, options, prune_columns, child_schema_element, stream, mr);
CUDF_EXPECTS(num_rows == child_column->size(),
"All children columns must have the same size");
child_columns.push_back(std::move(child_column));
column_names.back().children = names;
}
}
auto [result_bitmask, null_count] = make_validity(json_col);
// The null_mask is set after creation of struct column is to skip the superimpose_nulls and
Expand All @@ -1011,8 +1053,11 @@ std::pair<std::unique_ptr<column>, std::vector<column_name_info>> device_json_co
rmm::device_buffer{},
0);
// Create children column
auto child_schema_element = json_col.child_columns.empty()
? std::optional<schema_element>{}
: get_child_schema(json_col.child_columns.begin()->first);
auto [child_column, names] =
json_col.child_columns.empty()
json_col.child_columns.empty() or (prune_columns and !child_schema_element.has_value())
? std::pair<std::unique_ptr<column>,
// EMPTY type could not used because gather throws exception on EMPTY type.
std::vector<column_name_info>>{std::make_unique<column>(
Expand All @@ -1022,13 +1067,13 @@ std::pair<std::unique_ptr<column>, std::vector<column_name_info>> device_json_co
rmm::device_buffer{},
0),
std::vector<column_name_info>{}}
: device_json_column_to_cudf_column(
json_col.child_columns.begin()->second,
d_input,
options,
get_child_schema(json_col.child_columns.begin()->first),
stream,
mr);
: device_json_column_to_cudf_column(json_col.child_columns.begin()->second,
d_input,
options,
prune_columns,
child_schema_element,
stream,
mr);
column_names.back().children = names;
auto [result_bitmask, null_count] = make_validity(json_col);
auto ret_col = make_lists_column(num_rows,
Expand Down Expand Up @@ -1140,8 +1185,6 @@ table_with_metadata device_parse_nested_json(device_span<SymbolT const> d_input,
size_type column_index = 0;
for (auto const& col_name : root_struct_col.column_order) {
auto& json_col = root_struct_col.child_columns.find(col_name)->second;
// Insert this columns name into the schema
out_column_names.emplace_back(col_name);

std::optional<schema_element> child_schema_element = std::visit(
cudf::detail::visitor_overload{
Expand Down Expand Up @@ -1184,18 +1227,28 @@ table_with_metadata device_parse_nested_json(device_span<SymbolT const> d_input,
debug_schema_print(child_schema_element);
#endif

// Get this JSON column's cudf column and schema info, (modifies json_col)
auto [cudf_col, col_name_info] = device_json_column_to_cudf_column(
json_col, d_input, parse_opt, child_schema_element, stream, mr);
// TODO: RangeIndex as DataFrame.columns names for array of arrays
// if (is_array_of_arrays) {
// col_name_info.back().name = "";
// }

out_column_names.back().children = std::move(col_name_info);
out_columns.emplace_back(std::move(cudf_col));

column_index++;
if (!options.is_enabled_prune_columns() or child_schema_element.has_value()) {
// Get this JSON column's cudf column and schema info, (modifies json_col)
auto [cudf_col, col_name_info] =
device_json_column_to_cudf_column(json_col,
d_input,
parse_opt,
options.is_enabled_prune_columns(),
child_schema_element,
stream,
mr);
// Insert this column's name into the schema
out_column_names.emplace_back(col_name);
// TODO: RangeIndex as DataFrame.columns names for array of arrays
// if (is_array_of_arrays) {
// col_name_info.back().name = "";
// }

out_column_names.back().children = std::move(col_name_info);
out_columns.emplace_back(std::move(cudf_col));

column_index++;
}
}

return table_with_metadata{std::make_unique<table>(std::move(out_columns)), {out_column_names}};
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/io/json/nested_json.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ table_with_metadata device_parse_nested_json(device_span<SymbolT const> input,
* @return data type of the column if present
*/
std::optional<data_type> get_path_data_type(
host_span<std::pair<std::string, cudf::io::json::NodeT>> path,
host_span<std::pair<std::string, cudf::io::json::NodeT> const> path,
cudf::io::json_reader_options const& options);

/**
Expand Down
15 changes: 11 additions & 4 deletions cpp/src/io/json/parser_features.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,15 @@ std::optional<schema_element> child_schema_element(std::string const& col_name,
// "a": [ null] {"a", list}, {"element", str}
// back() is root.
// front() is leaf.
/**
* @brief Get the path data type of a column by path if present in input schema
*
* @param path path of the json column
* @param root root of input schema element
* @return data type of the column if present, otherwise std::nullopt
*/
std::optional<data_type> get_path_data_type(
host_span<std::pair<std::string, cudf::io::json::NodeT>> path, schema_element const& root)
host_span<std::pair<std::string, cudf::io::json::NodeT> const> path, schema_element const& root)
{
if (path.empty() || path.size() == 1) {
return root.type;
Expand All @@ -81,7 +88,7 @@ std::optional<data_type> get_path_data_type(
}

std::optional<data_type> get_path_data_type(
host_span<std::pair<std::string, cudf::io::json::NodeT>> path,
host_span<std::pair<std::string, cudf::io::json::NodeT> const> path,
cudf::io::json_reader_options const& options)
{
if (path.empty()) return {};
Expand All @@ -98,11 +105,11 @@ std::optional<data_type> get_path_data_type(
std::vector<path_from_tree::path_rep> path_from_tree::get_path(NodeIndexT this_col_id)
{
std::vector<path_rep> path;
// TODO Need to stop at row root. so, how to find row root?
// stops at root.
while (this_col_id != parent_node_sentinel) {
auto type = column_categories[this_col_id];
std::string name = "";
// TODO make this ifelse into a separate lambda function, along with parent_col_id.
// code same as name_and_parent_index lambda.
auto parent_col_id = column_parent_ids[this_col_id];
if (parent_col_id == parent_node_sentinel || column_categories[parent_col_id] == NC_LIST) {
if (is_array_of_arrays && parent_col_id == row_array_parent_col_id) {
Expand Down
Loading

0 comments on commit 2fccbc0

Please sign in to comment.