Skip to content

Commit

Permalink
Fix ORC writer crash with empty input columns (#9808)
Browse files Browse the repository at this point in the history
Fixes #9783

Skip some parts of writing when the input table was zero rows.
Add is_empty to `hostdevice_2dvector`.
Add Python test with empty columns.

Authors:
  - Vukasin Milovanovic (https://github.com/vuule)

Approvers:
  - GALI PREM SAGAR (https://github.com/galipremsagar)
  - Vyas Ramasubramani (https://github.com/vyasr)
  - Devavret Makkar (https://github.com/devavret)
  - Conor Hoekstra (https://github.com/codereport)

URL: #9808
  • Loading branch information
vuule authored Dec 2, 2021
1 parent 50acf07 commit b848dd5
Show file tree
Hide file tree
Showing 3 changed files with 188 additions and 166 deletions.
338 changes: 172 additions & 166 deletions cpp/src/io/orc/writer_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -579,12 +579,15 @@ orc_streams writer::impl::create_streams(host_span<orc_column_view> columns,
}

auto const direct_data_size =
std::accumulate(segmentation.stripes.front().cbegin(),
segmentation.stripes.back().cend(),
size_t{0},
[&](auto data_size, auto rg_idx) {
return data_size + column.host_dict_chunk(rg_idx)->string_char_count;
});
segmentation.num_stripes() == 0
? 0
: std::accumulate(segmentation.stripes.front().cbegin(),
segmentation.stripes.back().cend(),
size_t{0},
[&](auto data_size, auto rg_idx) {
return data_size +
column.host_dict_chunk(rg_idx)->string_char_count;
});
if (enable_dict) {
uint32_t dict_bits = 0;
for (dict_bits = 1; dict_bits < 32; dict_bits <<= 1) {
Expand Down Expand Up @@ -988,17 +991,19 @@ encoded_data encode_columns(orc_table_view const& orc_table,
}
chunk_streams.host_to_device(stream);

if (orc_table.num_string_columns() != 0) {
auto d_stripe_dict = orc_table.string_column(0).device_stripe_dict();
gpu::EncodeStripeDictionaries(d_stripe_dict,
chunks,
orc_table.num_string_columns(),
segmentation.num_stripes(),
chunk_streams,
stream);
}
if (orc_table.num_rows() > 0) {
if (orc_table.num_string_columns() != 0) {
auto d_stripe_dict = orc_table.string_column(0).device_stripe_dict();
gpu::EncodeStripeDictionaries(d_stripe_dict,
chunks,
orc_table.num_string_columns(),
segmentation.num_stripes(),
chunk_streams,
stream);
}

gpu::EncodeOrcColumnData(chunks, chunk_streams, stream);
gpu::EncodeOrcColumnData(chunks, chunk_streams, stream);
}
dictionaries.data.clear();
dictionaries.index.clear();
stream.synchronize();
Expand Down Expand Up @@ -1803,7 +1808,7 @@ void writer::impl::write(table_view const& table)
auto dictionaries = allocate_dictionaries(orc_table, rowgroup_bounds, stream);
hostdevice_2dvector<gpu::DictionaryChunk> dict(
rowgroup_bounds.size().first, orc_table.num_string_columns(), stream);
if (orc_table.num_string_columns() != 0) {
if (not dict.is_empty()) {
init_dictionaries(orc_table,
rowgroup_bounds,
dictionaries.d_data_view,
Expand All @@ -1819,7 +1824,7 @@ void writer::impl::write(table_view const& table)
// Build stripe-level dictionaries
hostdevice_2dvector<gpu::StripeDictionary> stripe_dict(
segmentation.num_stripes(), orc_table.num_string_columns(), stream);
if (orc_table.num_string_columns() != 0) {
if (not stripe_dict.is_empty()) {
build_dictionaries(orc_table,
segmentation.stripes,
dict,
Expand All @@ -1842,165 +1847,166 @@ void writer::impl::write(table_view const& table)
segmentation.num_stripes(), num_data_streams, stream);
auto stripes = gather_stripes(num_index_streams, segmentation, &enc_data.streams, &strm_descs);

// Gather column statistics
std::vector<ColStatsBlob> column_stats;
if (enable_statistics_ && table.num_columns() > 0 && num_rows > 0) {
column_stats = gather_statistic_blobs(orc_table, segmentation);
}
if (num_rows > 0) {
// Gather column statistics
auto const column_stats = enable_statistics_ && table.num_columns() > 0
? gather_statistic_blobs(orc_table, segmentation)
: std::vector<ColStatsBlob>{};

// Allocate intermediate output stream buffer
size_t compressed_bfr_size = 0;
size_t num_compressed_blocks = 0;
size_t max_compressed_block_size = 0;
if (compression_kind_ != NONE) {
nvcompBatchedSnappyCompressGetMaxOutputChunkSize(
compression_blocksize_, nvcompBatchedSnappyDefaultOpts, &max_compressed_block_size);
}
auto stream_output = [&]() {
size_t max_stream_size = 0;
bool all_device_write = true;
// Allocate intermediate output stream buffer
size_t compressed_bfr_size = 0;
size_t num_compressed_blocks = 0;
size_t max_compressed_block_size = 0;
if (compression_kind_ != NONE) {
nvcompBatchedSnappyCompressGetMaxOutputChunkSize(
compression_blocksize_, nvcompBatchedSnappyDefaultOpts, &max_compressed_block_size);
}
auto stream_output = [&]() {
size_t max_stream_size = 0;
bool all_device_write = true;

for (auto& ss : strm_descs.host_view().flat_view()) {
if (!out_sink_->is_device_write_preferred(ss.stream_size)) { all_device_write = false; }
size_t stream_size = ss.stream_size;
if (compression_kind_ != NONE) {
ss.first_block = num_compressed_blocks;
ss.bfr_offset = compressed_bfr_size;

auto num_blocks = std::max<uint32_t>(
(stream_size + compression_blocksize_ - 1) / compression_blocksize_, 1);
stream_size += num_blocks * BLOCK_HEADER_SIZE;
num_compressed_blocks += num_blocks;
compressed_bfr_size += (max_compressed_block_size + BLOCK_HEADER_SIZE) * num_blocks;
}
max_stream_size = std::max(max_stream_size, stream_size);
}

for (auto& ss : strm_descs.host_view().flat_view()) {
if (!out_sink_->is_device_write_preferred(ss.stream_size)) { all_device_write = false; }
size_t stream_size = ss.stream_size;
if (compression_kind_ != NONE) {
ss.first_block = num_compressed_blocks;
ss.bfr_offset = compressed_bfr_size;

auto num_blocks = std::max<uint32_t>(
(stream_size + compression_blocksize_ - 1) / compression_blocksize_, 1);
stream_size += num_blocks * BLOCK_HEADER_SIZE;
num_compressed_blocks += num_blocks;
compressed_bfr_size += (max_compressed_block_size + BLOCK_HEADER_SIZE) * num_blocks;
if (all_device_write) {
return pinned_buffer<uint8_t>{nullptr, cudaFreeHost};
} else {
return pinned_buffer<uint8_t>{[](size_t size) {
uint8_t* ptr = nullptr;
CUDA_TRY(cudaMallocHost(&ptr, size));
return ptr;
}(max_stream_size),
cudaFreeHost};
}
max_stream_size = std::max(max_stream_size, stream_size);
}
}();

if (all_device_write) {
return pinned_buffer<uint8_t>{nullptr, cudaFreeHost};
} else {
return pinned_buffer<uint8_t>{[](size_t size) {
uint8_t* ptr = nullptr;
CUDA_TRY(cudaMallocHost(&ptr, size));
return ptr;
}(max_stream_size),
cudaFreeHost};
// Compress the data streams
rmm::device_buffer compressed_data(compressed_bfr_size, stream);
hostdevice_vector<gpu_inflate_status_s> comp_out(num_compressed_blocks, stream);
hostdevice_vector<gpu_inflate_input_s> comp_in(num_compressed_blocks, stream);
if (compression_kind_ != NONE) {
strm_descs.host_to_device(stream);
gpu::CompressOrcDataStreams(static_cast<uint8_t*>(compressed_data.data()),
num_compressed_blocks,
compression_kind_,
compression_blocksize_,
max_compressed_block_size,
strm_descs,
enc_data.streams,
comp_in,
comp_out,
stream);
strm_descs.device_to_host(stream);
comp_out.device_to_host(stream, true);
}
}();

// Compress the data streams
rmm::device_buffer compressed_data(compressed_bfr_size, stream);
hostdevice_vector<gpu_inflate_status_s> comp_out(num_compressed_blocks, stream);
hostdevice_vector<gpu_inflate_input_s> comp_in(num_compressed_blocks, stream);
if (compression_kind_ != NONE) {
strm_descs.host_to_device(stream);
gpu::CompressOrcDataStreams(static_cast<uint8_t*>(compressed_data.data()),
num_compressed_blocks,
compression_kind_,
compression_blocksize_,
max_compressed_block_size,
strm_descs,
enc_data.streams,
comp_in,
comp_out,
stream);
strm_descs.device_to_host(stream);
comp_out.device_to_host(stream, true);
}

ProtobufWriter pbw_(&buffer_);

// Write stripes
std::vector<std::future<void>> write_tasks;
for (size_t stripe_id = 0; stripe_id < stripes.size(); ++stripe_id) {
auto const& rowgroups_range = segmentation.stripes[stripe_id];
auto& stripe = stripes[stripe_id];

stripe.offset = out_sink_->bytes_written();

// Column (skippable) index streams appear at the start of the stripe
for (size_type stream_id = 0; stream_id < num_index_streams; ++stream_id) {
write_index_stream(stripe_id,
stream_id,
orc_table.columns,
rowgroups_range,
enc_data.streams,
strm_descs,
comp_out,
&stripe,
&streams,
&pbw_);
}
ProtobufWriter pbw_(&buffer_);

// Write stripes
std::vector<std::future<void>> write_tasks;
for (size_t stripe_id = 0; stripe_id < stripes.size(); ++stripe_id) {
auto const& rowgroups_range = segmentation.stripes[stripe_id];
auto& stripe = stripes[stripe_id];

stripe.offset = out_sink_->bytes_written();

// Column (skippable) index streams appear at the start of the stripe
for (size_type stream_id = 0; stream_id < num_index_streams; ++stream_id) {
write_index_stream(stripe_id,
stream_id,
orc_table.columns,
rowgroups_range,
enc_data.streams,
strm_descs,
comp_out,
&stripe,
&streams,
&pbw_);
}

// Column data consisting one or more separate streams
for (auto const& strm_desc : strm_descs[stripe_id]) {
write_tasks.push_back(
write_data_stream(strm_desc,
enc_data.streams[strm_desc.column_id][rowgroups_range.first],
static_cast<uint8_t const*>(compressed_data.data()),
stream_output.get(),
&stripe,
&streams));
}
// Column data consisting one or more separate streams
for (auto const& strm_desc : strm_descs[stripe_id]) {
write_tasks.push_back(
write_data_stream(strm_desc,
enc_data.streams[strm_desc.column_id][rowgroups_range.first],
static_cast<uint8_t const*>(compressed_data.data()),
stream_output.get(),
&stripe,
&streams));
}

// Write stripefooter consisting of stream information
StripeFooter sf;
sf.streams = streams;
sf.columns.resize(orc_table.num_columns() + 1);
sf.columns[0].kind = DIRECT;
for (size_t i = 1; i < sf.columns.size(); ++i) {
sf.columns[i].kind = orc_table.column(i - 1).orc_encoding();
sf.columns[i].dictionarySize =
(sf.columns[i].kind == DICTIONARY_V2)
? orc_table.column(i - 1).host_stripe_dict(stripe_id)->num_strings
: 0;
if (orc_table.column(i - 1).orc_kind() == TIMESTAMP) { sf.writerTimezone = "UTC"; }
// Write stripefooter consisting of stream information
StripeFooter sf;
sf.streams = streams;
sf.columns.resize(orc_table.num_columns() + 1);
sf.columns[0].kind = DIRECT;
for (size_t i = 1; i < sf.columns.size(); ++i) {
sf.columns[i].kind = orc_table.column(i - 1).orc_encoding();
sf.columns[i].dictionarySize =
(sf.columns[i].kind == DICTIONARY_V2)
? orc_table.column(i - 1).host_stripe_dict(stripe_id)->num_strings
: 0;
if (orc_table.column(i - 1).orc_kind() == TIMESTAMP) { sf.writerTimezone = "UTC"; }
}
buffer_.resize((compression_kind_ != NONE) ? 3 : 0);
pbw_.write(sf);
stripe.footerLength = buffer_.size();
if (compression_kind_ != NONE) {
uint32_t uncomp_sf_len = (stripe.footerLength - 3) * 2 + 1;
buffer_[0] = static_cast<uint8_t>(uncomp_sf_len >> 0);
buffer_[1] = static_cast<uint8_t>(uncomp_sf_len >> 8);
buffer_[2] = static_cast<uint8_t>(uncomp_sf_len >> 16);
}
out_sink_->host_write(buffer_.data(), buffer_.size());
}
buffer_.resize((compression_kind_ != NONE) ? 3 : 0);
pbw_.write(sf);
stripe.footerLength = buffer_.size();
if (compression_kind_ != NONE) {
uint32_t uncomp_sf_len = (stripe.footerLength - 3) * 2 + 1;
buffer_[0] = static_cast<uint8_t>(uncomp_sf_len >> 0);
buffer_[1] = static_cast<uint8_t>(uncomp_sf_len >> 8);
buffer_[2] = static_cast<uint8_t>(uncomp_sf_len >> 16);
for (auto const& task : write_tasks) {
task.wait();
}
out_sink_->host_write(buffer_.data(), buffer_.size());
}
for (auto const& task : write_tasks) {
task.wait();
}

if (column_stats.size() != 0) {
// File-level statistics
// NOTE: Excluded from chunked write mode to avoid the need for merging stats across calls
if (single_write_mode) {
// First entry contains total number of rows
buffer_.resize(0);
pbw_.putb(1 * 8 + PB_TYPE_VARINT);
pbw_.put_uint(num_rows);
ff.statistics.reserve(1 + orc_table.num_columns());
ff.statistics.emplace_back(std::move(buffer_));
// Add file stats, stored after stripe stats in `column_stats`
ff.statistics.insert(
ff.statistics.end(),
std::make_move_iterator(column_stats.begin()) + stripes.size() * orc_table.num_columns(),
std::make_move_iterator(column_stats.end()));
}
// Stripe-level statistics
size_t first_stripe = md.stripeStats.size();
md.stripeStats.resize(first_stripe + stripes.size());
for (size_t stripe_id = 0; stripe_id < stripes.size(); stripe_id++) {
md.stripeStats[first_stripe + stripe_id].colStats.resize(1 + orc_table.num_columns());
buffer_.resize(0);
pbw_.putb(1 * 8 + PB_TYPE_VARINT);
pbw_.put_uint(stripes[stripe_id].numberOfRows);
md.stripeStats[first_stripe + stripe_id].colStats[0] = std::move(buffer_);
for (size_t col_idx = 0; col_idx < orc_table.num_columns(); col_idx++) {
size_t idx = stripes.size() * col_idx + stripe_id;
if (idx < column_stats.size()) {
md.stripeStats[first_stripe + stripe_id].colStats[1 + col_idx] =
std::move(column_stats[idx]);
if (not column_stats.empty()) {
// File-level statistics
// NOTE: Excluded from chunked write mode to avoid the need for merging stats across calls
if (single_write_mode) {
// First entry contains total number of rows
buffer_.resize(0);
pbw_.putb(1 * 8 + PB_TYPE_VARINT);
pbw_.put_uint(num_rows);
ff.statistics.reserve(1 + orc_table.num_columns());
ff.statistics.emplace_back(std::move(buffer_));
// Add file stats, stored after stripe stats in `column_stats`
ff.statistics.insert(
ff.statistics.end(),
std::make_move_iterator(column_stats.begin()) + stripes.size() * orc_table.num_columns(),
std::make_move_iterator(column_stats.end()));
}
// Stripe-level statistics
size_t first_stripe = md.stripeStats.size();
md.stripeStats.resize(first_stripe + stripes.size());
for (size_t stripe_id = 0; stripe_id < stripes.size(); stripe_id++) {
md.stripeStats[first_stripe + stripe_id].colStats.resize(1 + orc_table.num_columns());
buffer_.resize(0);
pbw_.putb(1 * 8 + PB_TYPE_VARINT);
pbw_.put_uint(stripes[stripe_id].numberOfRows);
md.stripeStats[first_stripe + stripe_id].colStats[0] = std::move(buffer_);
for (size_t col_idx = 0; col_idx < orc_table.num_columns(); col_idx++) {
size_t idx = stripes.size() * col_idx + stripe_id;
if (idx < column_stats.size()) {
md.stripeStats[first_stripe + stripe_id].colStats[1 + col_idx] =
std::move(column_stats[idx]);
}
}
}
}
Expand Down
1 change: 1 addition & 0 deletions cpp/src/io/utilities/hostdevice_vector.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ class hostdevice_2dvector {

auto size() const noexcept { return _size; }
auto count() const noexcept { return _size.first * _size.second; }
auto is_empty() const noexcept { return count() == 0; }

T* base_host_ptr(size_t offset = 0) { return _data.host_ptr(offset); }
T* base_device_ptr(size_t offset = 0) { return _data.device_ptr(offset); }
Expand Down
Loading

0 comments on commit b848dd5

Please sign in to comment.