Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix ORC writer crash with empty input columns #9808

Merged
merged 5 commits into from
Dec 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
338 changes: 172 additions & 166 deletions cpp/src/io/orc/writer_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -579,12 +579,15 @@ orc_streams writer::impl::create_streams(host_span<orc_column_view> columns,
}

auto const direct_data_size =
std::accumulate(segmentation.stripes.front().cbegin(),
segmentation.stripes.back().cend(),
size_t{0},
[&](auto data_size, auto rg_idx) {
return data_size + column.host_dict_chunk(rg_idx)->string_char_count;
});
segmentation.num_stripes() == 0
? 0
: std::accumulate(segmentation.stripes.front().cbegin(),
segmentation.stripes.back().cend(),
size_t{0},
[&](auto data_size, auto rg_idx) {
return data_size +
column.host_dict_chunk(rg_idx)->string_char_count;
});
if (enable_dict) {
uint32_t dict_bits = 0;
for (dict_bits = 1; dict_bits < 32; dict_bits <<= 1) {
Expand Down Expand Up @@ -988,17 +991,19 @@ encoded_data encode_columns(orc_table_view const& orc_table,
}
chunk_streams.host_to_device(stream);

if (orc_table.num_string_columns() != 0) {
auto d_stripe_dict = orc_table.string_column(0).device_stripe_dict();
gpu::EncodeStripeDictionaries(d_stripe_dict,
chunks,
orc_table.num_string_columns(),
segmentation.num_stripes(),
chunk_streams,
stream);
}
if (orc_table.num_rows() > 0) {
if (orc_table.num_string_columns() != 0) {
vuule marked this conversation as resolved.
Show resolved Hide resolved
auto d_stripe_dict = orc_table.string_column(0).device_stripe_dict();
gpu::EncodeStripeDictionaries(d_stripe_dict,
chunks,
orc_table.num_string_columns(),
segmentation.num_stripes(),
chunk_streams,
stream);
}

gpu::EncodeOrcColumnData(chunks, chunk_streams, stream);
gpu::EncodeOrcColumnData(chunks, chunk_streams, stream);
}
dictionaries.data.clear();
dictionaries.index.clear();
stream.synchronize();
Expand Down Expand Up @@ -1803,7 +1808,7 @@ void writer::impl::write(table_view const& table)
auto dictionaries = allocate_dictionaries(orc_table, rowgroup_bounds, stream);
hostdevice_2dvector<gpu::DictionaryChunk> dict(
rowgroup_bounds.size().first, orc_table.num_string_columns(), stream);
if (orc_table.num_string_columns() != 0) {
if (not dict.is_empty()) {
vyasr marked this conversation as resolved.
Show resolved Hide resolved
init_dictionaries(orc_table,
rowgroup_bounds,
dictionaries.d_data_view,
Expand All @@ -1819,7 +1824,7 @@ void writer::impl::write(table_view const& table)
// Build stripe-level dictionaries
hostdevice_2dvector<gpu::StripeDictionary> stripe_dict(
segmentation.num_stripes(), orc_table.num_string_columns(), stream);
if (orc_table.num_string_columns() != 0) {
if (not stripe_dict.is_empty()) {
build_dictionaries(orc_table,
segmentation.stripes,
dict,
Expand All @@ -1842,165 +1847,166 @@ void writer::impl::write(table_view const& table)
segmentation.num_stripes(), num_data_streams, stream);
auto stripes = gather_stripes(num_index_streams, segmentation, &enc_data.streams, &strm_descs);

// Gather column statistics
std::vector<ColStatsBlob> column_stats;
if (enable_statistics_ && table.num_columns() > 0 && num_rows > 0) {
column_stats = gather_statistic_blobs(orc_table, segmentation);
}
if (num_rows > 0) {
// Gather column statistics
auto const column_stats = enable_statistics_ && table.num_columns() > 0
? gather_statistic_blobs(orc_table, segmentation)
: std::vector<ColStatsBlob>{};

// Allocate intermediate output stream buffer
size_t compressed_bfr_size = 0;
size_t num_compressed_blocks = 0;
size_t max_compressed_block_size = 0;
if (compression_kind_ != NONE) {
nvcompBatchedSnappyCompressGetMaxOutputChunkSize(
compression_blocksize_, nvcompBatchedSnappyDefaultOpts, &max_compressed_block_size);
}
auto stream_output = [&]() {
size_t max_stream_size = 0;
bool all_device_write = true;
// Allocate intermediate output stream buffer
size_t compressed_bfr_size = 0;
size_t num_compressed_blocks = 0;
size_t max_compressed_block_size = 0;
if (compression_kind_ != NONE) {
nvcompBatchedSnappyCompressGetMaxOutputChunkSize(
compression_blocksize_, nvcompBatchedSnappyDefaultOpts, &max_compressed_block_size);
}
auto stream_output = [&]() {
size_t max_stream_size = 0;
bool all_device_write = true;

for (auto& ss : strm_descs.host_view().flat_view()) {
if (!out_sink_->is_device_write_preferred(ss.stream_size)) { all_device_write = false; }
size_t stream_size = ss.stream_size;
if (compression_kind_ != NONE) {
ss.first_block = num_compressed_blocks;
ss.bfr_offset = compressed_bfr_size;

auto num_blocks = std::max<uint32_t>(
(stream_size + compression_blocksize_ - 1) / compression_blocksize_, 1);
stream_size += num_blocks * BLOCK_HEADER_SIZE;
num_compressed_blocks += num_blocks;
compressed_bfr_size += (max_compressed_block_size + BLOCK_HEADER_SIZE) * num_blocks;
}
max_stream_size = std::max(max_stream_size, stream_size);
}

for (auto& ss : strm_descs.host_view().flat_view()) {
if (!out_sink_->is_device_write_preferred(ss.stream_size)) { all_device_write = false; }
size_t stream_size = ss.stream_size;
if (compression_kind_ != NONE) {
ss.first_block = num_compressed_blocks;
ss.bfr_offset = compressed_bfr_size;

auto num_blocks = std::max<uint32_t>(
(stream_size + compression_blocksize_ - 1) / compression_blocksize_, 1);
stream_size += num_blocks * BLOCK_HEADER_SIZE;
num_compressed_blocks += num_blocks;
compressed_bfr_size += (max_compressed_block_size + BLOCK_HEADER_SIZE) * num_blocks;
if (all_device_write) {
return pinned_buffer<uint8_t>{nullptr, cudaFreeHost};
} else {
return pinned_buffer<uint8_t>{[](size_t size) {
uint8_t* ptr = nullptr;
CUDA_TRY(cudaMallocHost(&ptr, size));
return ptr;
}(max_stream_size),
cudaFreeHost};
}
max_stream_size = std::max(max_stream_size, stream_size);
}
}();

if (all_device_write) {
return pinned_buffer<uint8_t>{nullptr, cudaFreeHost};
} else {
return pinned_buffer<uint8_t>{[](size_t size) {
uint8_t* ptr = nullptr;
CUDA_TRY(cudaMallocHost(&ptr, size));
return ptr;
}(max_stream_size),
cudaFreeHost};
// Compress the data streams
rmm::device_buffer compressed_data(compressed_bfr_size, stream);
hostdevice_vector<gpu_inflate_status_s> comp_out(num_compressed_blocks, stream);
hostdevice_vector<gpu_inflate_input_s> comp_in(num_compressed_blocks, stream);
if (compression_kind_ != NONE) {
strm_descs.host_to_device(stream);
gpu::CompressOrcDataStreams(static_cast<uint8_t*>(compressed_data.data()),
num_compressed_blocks,
compression_kind_,
compression_blocksize_,
max_compressed_block_size,
strm_descs,
enc_data.streams,
comp_in,
comp_out,
stream);
strm_descs.device_to_host(stream);
comp_out.device_to_host(stream, true);
}
}();

// Compress the data streams
rmm::device_buffer compressed_data(compressed_bfr_size, stream);
hostdevice_vector<gpu_inflate_status_s> comp_out(num_compressed_blocks, stream);
hostdevice_vector<gpu_inflate_input_s> comp_in(num_compressed_blocks, stream);
if (compression_kind_ != NONE) {
strm_descs.host_to_device(stream);
gpu::CompressOrcDataStreams(static_cast<uint8_t*>(compressed_data.data()),
num_compressed_blocks,
compression_kind_,
compression_blocksize_,
max_compressed_block_size,
strm_descs,
enc_data.streams,
comp_in,
comp_out,
stream);
strm_descs.device_to_host(stream);
comp_out.device_to_host(stream, true);
}

ProtobufWriter pbw_(&buffer_);

// Write stripes
std::vector<std::future<void>> write_tasks;
for (size_t stripe_id = 0; stripe_id < stripes.size(); ++stripe_id) {
auto const& rowgroups_range = segmentation.stripes[stripe_id];
auto& stripe = stripes[stripe_id];

stripe.offset = out_sink_->bytes_written();

// Column (skippable) index streams appear at the start of the stripe
for (size_type stream_id = 0; stream_id < num_index_streams; ++stream_id) {
write_index_stream(stripe_id,
stream_id,
orc_table.columns,
rowgroups_range,
enc_data.streams,
strm_descs,
comp_out,
&stripe,
&streams,
&pbw_);
}
ProtobufWriter pbw_(&buffer_);

// Write stripes
std::vector<std::future<void>> write_tasks;
for (size_t stripe_id = 0; stripe_id < stripes.size(); ++stripe_id) {
auto const& rowgroups_range = segmentation.stripes[stripe_id];
auto& stripe = stripes[stripe_id];

stripe.offset = out_sink_->bytes_written();

// Column (skippable) index streams appear at the start of the stripe
for (size_type stream_id = 0; stream_id < num_index_streams; ++stream_id) {
write_index_stream(stripe_id,
stream_id,
orc_table.columns,
rowgroups_range,
enc_data.streams,
strm_descs,
comp_out,
&stripe,
&streams,
&pbw_);
}

// Column data consisting one or more separate streams
for (auto const& strm_desc : strm_descs[stripe_id]) {
write_tasks.push_back(
write_data_stream(strm_desc,
enc_data.streams[strm_desc.column_id][rowgroups_range.first],
static_cast<uint8_t const*>(compressed_data.data()),
stream_output.get(),
&stripe,
&streams));
}
// Column data consisting one or more separate streams
for (auto const& strm_desc : strm_descs[stripe_id]) {
write_tasks.push_back(
write_data_stream(strm_desc,
enc_data.streams[strm_desc.column_id][rowgroups_range.first],
static_cast<uint8_t const*>(compressed_data.data()),
stream_output.get(),
&stripe,
&streams));
}

// Write stripefooter consisting of stream information
StripeFooter sf;
sf.streams = streams;
sf.columns.resize(orc_table.num_columns() + 1);
sf.columns[0].kind = DIRECT;
for (size_t i = 1; i < sf.columns.size(); ++i) {
sf.columns[i].kind = orc_table.column(i - 1).orc_encoding();
sf.columns[i].dictionarySize =
(sf.columns[i].kind == DICTIONARY_V2)
? orc_table.column(i - 1).host_stripe_dict(stripe_id)->num_strings
: 0;
if (orc_table.column(i - 1).orc_kind() == TIMESTAMP) { sf.writerTimezone = "UTC"; }
// Write stripefooter consisting of stream information
StripeFooter sf;
mythrocks marked this conversation as resolved.
Show resolved Hide resolved
sf.streams = streams;
sf.columns.resize(orc_table.num_columns() + 1);
sf.columns[0].kind = DIRECT;
for (size_t i = 1; i < sf.columns.size(); ++i) {
sf.columns[i].kind = orc_table.column(i - 1).orc_encoding();
sf.columns[i].dictionarySize =
(sf.columns[i].kind == DICTIONARY_V2)
? orc_table.column(i - 1).host_stripe_dict(stripe_id)->num_strings
: 0;
if (orc_table.column(i - 1).orc_kind() == TIMESTAMP) { sf.writerTimezone = "UTC"; }
}
buffer_.resize((compression_kind_ != NONE) ? 3 : 0);
pbw_.write(sf);
stripe.footerLength = buffer_.size();
if (compression_kind_ != NONE) {
uint32_t uncomp_sf_len = (stripe.footerLength - 3) * 2 + 1;
buffer_[0] = static_cast<uint8_t>(uncomp_sf_len >> 0);
buffer_[1] = static_cast<uint8_t>(uncomp_sf_len >> 8);
buffer_[2] = static_cast<uint8_t>(uncomp_sf_len >> 16);
}
out_sink_->host_write(buffer_.data(), buffer_.size());
}
buffer_.resize((compression_kind_ != NONE) ? 3 : 0);
pbw_.write(sf);
stripe.footerLength = buffer_.size();
if (compression_kind_ != NONE) {
uint32_t uncomp_sf_len = (stripe.footerLength - 3) * 2 + 1;
buffer_[0] = static_cast<uint8_t>(uncomp_sf_len >> 0);
buffer_[1] = static_cast<uint8_t>(uncomp_sf_len >> 8);
buffer_[2] = static_cast<uint8_t>(uncomp_sf_len >> 16);
for (auto const& task : write_tasks) {
task.wait();
}
out_sink_->host_write(buffer_.data(), buffer_.size());
}
for (auto const& task : write_tasks) {
task.wait();
}

if (column_stats.size() != 0) {
// File-level statistics
// NOTE: Excluded from chunked write mode to avoid the need for merging stats across calls
if (single_write_mode) {
// First entry contains total number of rows
buffer_.resize(0);
pbw_.putb(1 * 8 + PB_TYPE_VARINT);
pbw_.put_uint(num_rows);
ff.statistics.reserve(1 + orc_table.num_columns());
ff.statistics.emplace_back(std::move(buffer_));
// Add file stats, stored after stripe stats in `column_stats`
ff.statistics.insert(
ff.statistics.end(),
std::make_move_iterator(column_stats.begin()) + stripes.size() * orc_table.num_columns(),
std::make_move_iterator(column_stats.end()));
}
// Stripe-level statistics
size_t first_stripe = md.stripeStats.size();
md.stripeStats.resize(first_stripe + stripes.size());
for (size_t stripe_id = 0; stripe_id < stripes.size(); stripe_id++) {
md.stripeStats[first_stripe + stripe_id].colStats.resize(1 + orc_table.num_columns());
buffer_.resize(0);
pbw_.putb(1 * 8 + PB_TYPE_VARINT);
pbw_.put_uint(stripes[stripe_id].numberOfRows);
md.stripeStats[first_stripe + stripe_id].colStats[0] = std::move(buffer_);
for (size_t col_idx = 0; col_idx < orc_table.num_columns(); col_idx++) {
size_t idx = stripes.size() * col_idx + stripe_id;
if (idx < column_stats.size()) {
md.stripeStats[first_stripe + stripe_id].colStats[1 + col_idx] =
std::move(column_stats[idx]);
if (not column_stats.empty()) {
// File-level statistics
// NOTE: Excluded from chunked write mode to avoid the need for merging stats across calls
if (single_write_mode) {
// First entry contains total number of rows
buffer_.resize(0);
pbw_.putb(1 * 8 + PB_TYPE_VARINT);
pbw_.put_uint(num_rows);
ff.statistics.reserve(1 + orc_table.num_columns());
ff.statistics.emplace_back(std::move(buffer_));
// Add file stats, stored after stripe stats in `column_stats`
ff.statistics.insert(
ff.statistics.end(),
std::make_move_iterator(column_stats.begin()) + stripes.size() * orc_table.num_columns(),
std::make_move_iterator(column_stats.end()));
}
// Stripe-level statistics
size_t first_stripe = md.stripeStats.size();
md.stripeStats.resize(first_stripe + stripes.size());
for (size_t stripe_id = 0; stripe_id < stripes.size(); stripe_id++) {
md.stripeStats[first_stripe + stripe_id].colStats.resize(1 + orc_table.num_columns());
buffer_.resize(0);
pbw_.putb(1 * 8 + PB_TYPE_VARINT);
pbw_.put_uint(stripes[stripe_id].numberOfRows);
md.stripeStats[first_stripe + stripe_id].colStats[0] = std::move(buffer_);
for (size_t col_idx = 0; col_idx < orc_table.num_columns(); col_idx++) {
size_t idx = stripes.size() * col_idx + stripe_id;
if (idx < column_stats.size()) {
md.stripeStats[first_stripe + stripe_id].colStats[1 + col_idx] =
std::move(column_stats[idx]);
}
}
}
}
Expand Down
1 change: 1 addition & 0 deletions cpp/src/io/utilities/hostdevice_vector.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ class hostdevice_2dvector {

auto size() const noexcept { return _size; }
auto count() const noexcept { return _size.first * _size.second; }
auto is_empty() const noexcept { return count() == 0; }

T* base_host_ptr(size_t offset = 0) { return _data.host_ptr(offset); }
T* base_device_ptr(size_t offset = 0) { return _data.device_ptr(offset); }
Expand Down
Loading