Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add option to nullify empty lines #17028

Closed
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions cpp/include/cudf/io/json.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ class json_reader_options {
// Normalize unquoted spaces and tabs
bool _normalize_whitespace = false;

bool _nullify_empty_lines = false;

// Whether to recover after an invalid JSON line
json_recovery_mode_t _recovery_mode = json_recovery_mode_t::FAIL;

Expand Down Expand Up @@ -313,6 +315,13 @@ class json_reader_options {
*/
[[nodiscard]] bool is_enabled_normalize_whitespace() const { return _normalize_whitespace; }

/**
* @brief Whether the reader should nullify empty lines for json lines format with recovery mode
*
* @returns true if the reader should nullify empty lines, false otherwise
*/
[[nodiscard]] bool is_nullify_empty_lines() const { return _nullify_empty_lines; }

/**
* @brief Queries the JSON reader's behavior on invalid JSON lines.
*
Expand Down Expand Up @@ -502,6 +511,14 @@ class json_reader_options {
*/
void enable_normalize_whitespace(bool val) { _normalize_whitespace = val; }

/**
* @brief Set whether the reader should nullify empty lines for json lines format with recovery
* mode
*
* @param val Boolean value to indicate whether the reader should nullify empty lines
*/
void nullify_empty_lines(bool val) { _nullify_empty_lines = val; }

/**
* @brief Specifies the JSON reader's behavior on invalid JSON lines.
*
Expand Down Expand Up @@ -779,6 +796,19 @@ class json_reader_options_builder {
return *this;
}

/**
* @brief Set whether the reader should nullify empty lines for json lines format with recovery
* mode
*
* @param val Boolean value to indicate whether the reader should nullify empty lines
* @return this for chaining
*/
json_reader_options_builder& nullify_empty_lines(bool val)
{
options._nullify_empty_lines = val;
return *this;
}

/**
* @brief Specifies the JSON reader's behavior on invalid JSON lines.
*
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/io/json/nested_json.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -265,13 +265,15 @@ void get_stack_context(device_span<SymbolT const> json_in,
*
* @param tokens The tokens to be post-processed
* @param token_indices The tokens' corresponding indices that are post-processed
* @param nullify_empty_lines Whether to nullify empty lines
* @param stream The cuda stream to dispatch GPU kernels to
* @return Returns the post-processed token stream
*/
CUDF_EXPORT
std::pair<rmm::device_uvector<PdaTokenT>, rmm::device_uvector<SymbolOffsetT>> process_token_stream(
device_span<PdaTokenT const> tokens,
device_span<SymbolOffsetT const> token_indices,
bool nullify_empty_lines,
rmm::cuda_stream_view stream);

/**
Expand Down
42 changes: 26 additions & 16 deletions cpp/src/io/json/nested_json_gpu.cu
Original file line number Diff line number Diff line change
Expand Up @@ -195,11 +195,12 @@ using SymbolGroupT = uint8_t;
/**
* @brief Definition of the DFA's states
*/
enum class dfa_states : StateT { VALID, INVALID, NUM_STATES };
enum class dfa_states : StateT { START, VALID, INVALID, NUM_STATES };

// Aliases for readability of the transition table
constexpr auto TT_INV = dfa_states::INVALID;
constexpr auto TT_VLD = dfa_states::VALID;
constexpr auto TT_START = dfa_states::START;
constexpr auto TT_INV = dfa_states::INVALID;
constexpr auto TT_VLD = dfa_states::VALID;

/**
* @brief Definition of the symbol groups
Expand Down Expand Up @@ -238,14 +239,17 @@ struct UnwrapTokenFromSymbolOp {
* invalid lines.
*/
struct TransduceToken {
bool nullify_empty_lines;
Comment on lines 241 to +242
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this imply any performance hit? Please run benchmark with this. If there is any slowdown, we probably need to make this as a template argument (with sacrificing compile time) so we can optimize the code out if it is false.

template <typename RelativeOffsetT, typename SymbolT>
constexpr CUDF_HOST_DEVICE SymbolT operator()(StateT const state_id,
SymbolGroupT const match_id,
RelativeOffsetT const relative_offset,
SymbolT const read_symbol) const
{
bool const is_empty_invalid =
(nullify_empty_lines && state_id == static_cast<StateT>(TT_START));
bool const is_end_of_invalid_line =
(state_id == static_cast<StateT>(TT_INV) &&
((state_id == static_cast<StateT>(TT_INV) or is_empty_invalid) &&
match_id == static_cast<SymbolGroupT>(dfa_symbol_group_id::DELIMITER));

if (is_end_of_invalid_line) {
Expand All @@ -265,14 +269,17 @@ struct TransduceToken {
constexpr int32_t num_inv_tokens = 2;

bool const is_delimiter = match_id == static_cast<SymbolGroupT>(dfa_symbol_group_id::DELIMITER);
bool const is_empty_invalid =
(nullify_empty_lines && state_id == static_cast<StateT>(TT_START));

// If state is either invalid or we're entering an invalid state, we discard tokens
bool const is_part_of_invalid_line =
(match_id != static_cast<SymbolGroupT>(dfa_symbol_group_id::ERROR) &&
state_id == static_cast<StateT>(TT_VLD));
(state_id == static_cast<StateT>(TT_VLD) or state_id == static_cast<StateT>(TT_START)));

// Indicates whether we transition from an invalid line to a potentially valid line
bool const is_end_of_invalid_line = (state_id == static_cast<StateT>(TT_INV) && is_delimiter);
bool const is_end_of_invalid_line =
((state_id == static_cast<StateT>(TT_INV) or is_empty_invalid) && is_delimiter);

int32_t const emit_count =
is_end_of_invalid_line ? num_inv_tokens : (is_part_of_invalid_line && !is_delimiter ? 1 : 0);
Expand All @@ -283,11 +290,12 @@ struct TransduceToken {
// Transition table
std::array<std::array<dfa_states, NUM_SYMBOL_GROUPS>, TT_NUM_STATES> const transition_table{
{/* IN_STATE ERROR DELIM OTHER */
/* VALID */ {{TT_INV, TT_VLD, TT_VLD}},
/* INVALID */ {{TT_INV, TT_VLD, TT_INV}}}};
/* START */ {{TT_INV, TT_START, TT_VLD}},
/* VALID */ {{TT_INV, TT_START, TT_VLD}},
/* INVALID */ {{TT_INV, TT_START, TT_INV}}}};

// The DFA's starting state
constexpr auto start_state = static_cast<StateT>(TT_VLD);
constexpr auto start_state = static_cast<StateT>(TT_START);
} // namespace token_filter

// JSON to stack operator DFA (Deterministic Finite Automata)
Expand Down Expand Up @@ -1506,17 +1514,19 @@ void get_stack_context(device_span<SymbolT const> json_in,
std::pair<rmm::device_uvector<PdaTokenT>, rmm::device_uvector<SymbolOffsetT>> process_token_stream(
device_span<PdaTokenT const> tokens,
device_span<SymbolOffsetT const> token_indices,
bool nullify_empty_lines,
rmm::cuda_stream_view stream)
{
// Instantiate FST for post-processing the token stream to remove all tokens that belong to an
// invalid JSON line
token_filter::UnwrapTokenFromSymbolOp sgid_op{};
using symbol_t = thrust::tuple<PdaTokenT, SymbolOffsetT>;
auto filter_fst = fst::detail::make_fst(
fst::detail::make_symbol_group_lut(token_filter::symbol_groups, sgid_op),
fst::detail::make_transition_table(token_filter::transition_table),
fst::detail::make_translation_functor<symbol_t, 0, 2>(token_filter::TransduceToken{}),
stream);
using symbol_t = thrust::tuple<PdaTokenT, SymbolOffsetT>;
auto filter_fst =
fst::detail::make_fst(fst::detail::make_symbol_group_lut(token_filter::symbol_groups, sgid_op),
fst::detail::make_transition_table(token_filter::transition_table),
fst::detail::make_translation_functor<symbol_t, 0, 2>(
token_filter::TransduceToken{nullify_empty_lines}),
stream);

auto const mr = cudf::get_current_device_resource_ref();
cudf::detail::device_scalar<SymbolOffsetT> d_num_selected_tokens(stream, mr);
Expand Down Expand Up @@ -1663,7 +1673,7 @@ std::pair<rmm::device_uvector<PdaTokenT>, rmm::device_uvector<SymbolOffsetT>> ge
tokens.set_element(0, token_t::LineEnd, stream);
validate_token_stream(json_in, tokens, tokens_indices, options, stream);
auto [filtered_tokens, filtered_tokens_indices] =
process_token_stream(tokens, tokens_indices, stream);
process_token_stream(tokens, tokens_indices, options.is_nullify_empty_lines(), stream);
tokens = std::move(filtered_tokens);
tokens_indices = std::move(filtered_tokens_indices);
}
Expand Down
141 changes: 141 additions & 0 deletions cpp/src/io/json/read_json.cu
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,27 @@
#include "io/json/nested_json.hpp"
#include "read_json.hpp"

#include <cudf/column/column_device_view.cuh>
#include <cudf/concatenate.hpp>
#include <cudf/detail/nvtx/ranges.hpp>
#include <cudf/detail/utilities/integer_utils.hpp>
#include <cudf/detail/utilities/stream_pool.hpp>
#include <cudf/detail/utilities/vector_factories.hpp>
#include <cudf/io/detail/json.hpp>
#include <cudf/strings/strings_column_view.hpp>
#include <cudf/utilities/error.hpp>
#include <cudf/utilities/memory_resource.hpp>
#include <cudf/utilities/span.hpp>

#include <rmm/device_uvector.hpp>
#include <rmm/exec_policy.hpp>

#include <cub/device/device_copy.cuh>
#include <cub/device/device_histogram.cuh>
#include <cuda/std/span>
#include <thrust/distance.h>
#include <thrust/iterator/constant_iterator.h>
#include <thrust/iterator/transform_output_iterator.h>
#include <thrust/scatter.h>

#include <numeric>
Expand Down Expand Up @@ -238,6 +244,26 @@ table_with_metadata read_batch(host_span<std::unique_ptr<datasource>> sources,
return device_parse_nested_json(buffer, reader_opts, stream, mr);
}

constexpr bool can_be_delimiter(char c)
{
// The character list below is from `json_reader_options.set_delimiter`.
shrshi marked this conversation as resolved.
Show resolved Hide resolved
switch (c) {
case '{':
case '[':
case '}':
case ']':
case ',':
case ':':
case '"':
case '\'':
case '\\':
case ' ':
case '\t':
case '\r': return false;
default: return true;
}
}

} // anonymous namespace

device_span<char> ingest_raw_input(device_span<char> buffer,
Expand Down Expand Up @@ -442,4 +468,119 @@ table_with_metadata read_json(host_span<std::unique_ptr<datasource>> sources,
{partial_tables[0].metadata.schema_info}};
}

std::tuple<rmm::device_buffer, char> preprocess(cudf::strings_column_view const& input,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
auto constexpr num_levels = 256;
auto constexpr lower_level = std::numeric_limits<char>::min();
auto constexpr upper_level = std::numeric_limits<char>::max();
auto const num_chars = input.chars_size(stream);

char delimiter;
{
auto histogram =
cudf::detail::make_zeroed_device_uvector_async<uint32_t>(num_levels, stream, mr);
size_t temp_storage_bytes = 0;
cub::DeviceHistogram::HistogramEven(nullptr,
temp_storage_bytes,
input.chars_begin(stream),
histogram.begin(),
num_levels,
lower_level,
upper_level,
num_chars,
stream.value());
rmm::device_buffer d_temp(temp_storage_bytes, stream);
cub::DeviceHistogram::HistogramEven(d_temp.data(),
temp_storage_bytes,
input.chars_begin(stream),
histogram.begin(),
num_levels,
lower_level,
upper_level,
num_chars,
stream.value());

auto const zero_level_idx = -lower_level; // the bin storing count for character `\0`
auto const first_zero_count_pos = thrust::find_if(
rmm::exec_policy_nosync(stream),
thrust::make_counting_iterator(0) + zero_level_idx, // ignore the negative characters
thrust::make_counting_iterator(0) + num_levels,
[zero_level_idx, counts = histogram.begin()] __device__(auto idx) -> bool {
auto const count = counts[idx];
if (count > 0) { return false; }
auto const first_non_existing_char = static_cast<char>(idx - zero_level_idx);
return can_be_delimiter(first_non_existing_char);
});

// This should never happen since the input should never cover the entire char range.
if (first_zero_count_pos == thrust::make_counting_iterator(0) + num_levels) {
throw std::logic_error(
"Cannot find any character suitable as delimiter during joining json strings.");
}
delimiter = static_cast<char>(
thrust::distance(thrust::make_counting_iterator(0) + zero_level_idx, first_zero_count_pos));
}

auto d_offsets_colview = input.offsets();
CUDF_EXPECTS(d_offsets_colview.null_count() == 0, "how can offsets have null count");
device_span<cudf::size_type const> d_offsets(d_offsets_colview.data<cudf::size_type>(),
d_offsets_colview.size());

rmm::device_buffer concatenated_buffer(num_chars + d_offsets.size() - 2, stream);

thrust::scatter(
rmm::exec_policy_nosync(stream),
thrust::make_constant_iterator(delimiter),
thrust::make_constant_iterator(delimiter) + d_offsets.size() - 2,
thrust::make_transform_iterator(
thrust::make_counting_iterator(1),
cuda::proclaim_return_type<cudf::size_type>(
[d_offsets = d_offsets.begin()] __device__(cudf::size_type idx) -> cudf::size_type {
return d_offsets[idx] + idx - 1;
})),
reinterpret_cast<char*>(concatenated_buffer.data()));

{
// cub device batched copy
auto input_it = thrust::make_transform_iterator(
thrust::make_counting_iterator(0),
cuda::proclaim_return_type<char const*>(
[input = input.chars_begin(stream), d_offsets = d_offsets.begin()] __device__(
cudf::size_type idx) -> char const* { return input + d_offsets[idx]; }));
auto output_it = thrust::make_transform_iterator(
thrust::make_counting_iterator(0),
cuda::proclaim_return_type<char*>(
[output = reinterpret_cast<char*>(concatenated_buffer.data()),
d_offsets = d_offsets.begin()] __device__(cudf::size_type idx) -> char* {
return output + d_offsets[idx] + idx;
}));
auto sizes_it = thrust::make_transform_iterator(
thrust::make_counting_iterator(0),
cuda::proclaim_return_type<cudf::size_type>(
[d_offsets = d_offsets.begin()] __device__(cudf::size_type idx) -> cudf::size_type {
return d_offsets[idx + 1] - d_offsets[idx];
}));
size_t temp_storage_bytes = 0;
cub::DeviceCopy::Batched(nullptr,
temp_storage_bytes,
input_it,
output_it,
sizes_it,
static_cast<uint32_t>(d_offsets.size() - 1),
stream.value());
rmm::device_buffer temp_storage(temp_storage_bytes, stream);
cub::DeviceCopy::Batched(temp_storage.data(),
temp_storage_bytes,
input_it,
output_it,
sizes_it,
static_cast<uint32_t>(d_offsets.size() - 1),
stream.value());
}

return std::tuple{std::move(concatenated_buffer), delimiter};
}

} // namespace cudf::io::json::detail
5 changes: 5 additions & 0 deletions cpp/src/io/json/read_json.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include <cudf/io/datasource.hpp>
#include <cudf/io/json.hpp>
#include <cudf/strings/strings_column_view.hpp>
#include <cudf/types.hpp>
#include <cudf/utilities/export.hpp>
#include <cudf/utilities/memory_resource.hpp>
Expand Down Expand Up @@ -73,5 +74,9 @@ table_with_metadata read_json(host_span<std::unique_ptr<datasource>> sources,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr);

std::tuple<rmm::device_buffer, char> preprocess(cudf::strings_column_view const& input,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see this function is called only in testing. Do we ever need it in the source code in other places. If not, can we generate the test string directly without this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. you can test without this function. But idea is that each string row is appended with 1 delimiter that's not present in the strings. This function is provided by @shrshi for you to convert string column to a rmm buffer and delimiter easily.

rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr);

} // namespace io::json::detail
} // namespace CUDF_EXPORT cudf
2 changes: 1 addition & 1 deletion cpp/tests/io/json/nested_json_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -864,7 +864,7 @@ TEST_F(JsonTest, PostProcessTokenStream)

// Run system-under-test
auto [d_filtered_tokens, d_filtered_indices] =
cuio_json::detail::process_token_stream(d_tokens, d_offsets, stream);
cuio_json::detail::process_token_stream(d_tokens, d_offsets, false, stream);

auto const filtered_tokens = cudf::detail::make_std_vector_async(d_filtered_tokens, stream);
auto const filtered_indices = cudf::detail::make_std_vector_async(d_filtered_indices, stream);
Expand Down
Loading