Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add multi-threaded writing to GDS writes #9372

Merged
merged 8 commits into from
Oct 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions cpp/include/cudf/io/data_sink.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

#include <rmm/cuda_stream_view.hpp>

#include <future>
#include <memory>
#include <string>
#include <vector>
Expand Down Expand Up @@ -132,6 +133,34 @@ class data_sink {
CUDF_FAIL("data_sink classes that support device_write must override it.");
}

/**
* @brief Asynchronously append the buffer content to the sink from a gpu address
*
* For optimal performance, should only be called when `is_device_write_preferred` returns `true`.
* Data sink implementations that don't support direct device writes don't need to override
* this function.
*
* `gpu_data` must not be freed until this call is synchronized.
* @code{.pseudo}
* auto result = device_write_async(gpu_data, size, stream);
* result.wait(); // OR result.get()
* @endcode
*
* @throws cudf::logic_error the object does not support direct device writes, i.e.
* `supports_device_write` returns `false`.
* @throws cudf::logic_error
*
* @param gpu_data Pointer to the buffer to be written into the sink object
* @param size Number of bytes to write
* @param stream CUDA stream to use
*/
virtual std::future<void> device_write_async(void const* gpu_data,
size_t size,
rmm::cuda_stream_view stream)
{
CUDF_FAIL("data_sink classes that support device_write_async must override it.");
}

/**
* @brief Flush the data written into the sink
*/
Expand Down
53 changes: 32 additions & 21 deletions cpp/src/io/orc/writer_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -1223,30 +1223,36 @@ void writer::impl::write_index_stream(int32_t stripe_id,
stripe->indexLength += buffer_.size();
}

void writer::impl::write_data_stream(gpu::StripeStream const& strm_desc,
gpu::encoder_chunk_streams const& enc_stream,
uint8_t const* compressed_data,
uint8_t* stream_out,
StripeInformation* stripe,
orc_streams* streams)
std::future<void> writer::impl::write_data_stream(gpu::StripeStream const& strm_desc,
gpu::encoder_chunk_streams const& enc_stream,
uint8_t const* compressed_data,
uint8_t* stream_out,
StripeInformation* stripe,
orc_streams* streams)
{
const auto length = strm_desc.stream_size;
(*streams)[enc_stream.ids[strm_desc.stream_type]].length = length;
if (length == 0) { return; }
if (length == 0) {
return std::async(std::launch::deferred, [] {});
}

const auto* stream_in = (compression_kind_ == NONE) ? enc_stream.data_ptrs[strm_desc.stream_type]
: (compressed_data + strm_desc.bfr_offset);

if (out_sink_->is_device_write_preferred(length)) {
out_sink_->device_write(stream_in, length, stream);
} else {
CUDA_TRY(
cudaMemcpyAsync(stream_out, stream_in, length, cudaMemcpyDeviceToHost, stream.value()));
stream.synchronize();
auto write_task = [&]() {
if (out_sink_->is_device_write_preferred(length)) {
return out_sink_->device_write_async(stream_in, length, stream);
} else {
CUDA_TRY(
cudaMemcpyAsync(stream_out, stream_in, length, cudaMemcpyDeviceToHost, stream.value()));
stream.synchronize();

out_sink_->host_write(stream_out, length);
}
out_sink_->host_write(stream_out, length);
return std::async(std::launch::deferred, [] {});
}
}();
stripe->dataLength += length;
return write_task;
}

void writer::impl::add_uncompressed_block_headers(std::vector<uint8_t>& v)
Expand Down Expand Up @@ -1850,6 +1856,7 @@ void writer::impl::write(table_view const& table)
ProtobufWriter pbw_(&buffer_);

// Write stripes
std::vector<std::future<void>> write_tasks;
for (size_t stripe_id = 0; stripe_id < stripes.size(); ++stripe_id) {
auto const& rowgroups_range = segmentation.stripes[stripe_id];
auto& stripe = stripes[stripe_id];
Expand All @@ -1872,12 +1879,13 @@ void writer::impl::write(table_view const& table)

// Column data consisting one or more separate streams
for (auto const& strm_desc : strm_descs[stripe_id]) {
write_data_stream(strm_desc,
enc_data.streams[strm_desc.column_id][rowgroups_range.first],
static_cast<uint8_t*>(compressed_data.data()),
stream_output.get(),
&stripe,
&streams);
write_tasks.push_back(
write_data_stream(strm_desc,
enc_data.streams[strm_desc.column_id][rowgroups_range.first],
static_cast<uint8_t const*>(compressed_data.data()),
stream_output.get(),
&stripe,
&streams));
}

// Write stripefooter consisting of stream information
Expand All @@ -1904,6 +1912,9 @@ void writer::impl::write(table_view const& table)
}
out_sink_->host_write(buffer_.data(), buffer_.size());
}
for (auto const& task : write_tasks) {
task.wait();
}

if (column_stats.size() != 0) {
// File-level statistics
Expand Down
13 changes: 7 additions & 6 deletions cpp/src/io/orc/writer_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -325,13 +325,14 @@ class writer::impl {
* @param[in,out] stream_out Temporary host output buffer
* @param[in,out] stripe Stream's parent stripe
* @param[in,out] streams List of all streams
* @return An std::future that should be synchronized to ensure the writing is complete
*/
void write_data_stream(gpu::StripeStream const& strm_desc,
gpu::encoder_chunk_streams const& enc_stream,
uint8_t const* compressed_data,
uint8_t* stream_out,
StripeInformation* stripe,
orc_streams* streams);
std::future<void> write_data_stream(gpu::StripeStream const& strm_desc,
gpu::encoder_chunk_streams const& enc_stream,
uint8_t const* compressed_data,
uint8_t* stream_out,
StripeInformation* stripe,
orc_streams* streams);

/**
* @brief Insert 3-byte uncompressed block headers in a byte vector
Expand Down
7 changes: 6 additions & 1 deletion cpp/src/io/parquet/writer_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -1379,6 +1379,7 @@ void writer::impl::write(table_view const& table)
(stats_granularity_ == statistics_freq::STATISTICS_PAGE) ? page_stats.data() : nullptr,
(stats_granularity_ != statistics_freq::STATISTICS_NONE) ? page_stats.data() + num_pages
: nullptr);
std::vector<std::future<void>> write_tasks;
for (; r < rnext; r++, global_r++) {
for (auto i = 0; i < num_columns; i++) {
gpu::EncColumnChunk* ck = &chunks[r][i];
Expand All @@ -1392,7 +1393,8 @@ void writer::impl::write(table_view const& table)

if (out_sink_->is_device_write_preferred(ck->compressed_size)) {
// let the writer do what it wants to retrieve the data from the gpu.
out_sink_->device_write(dev_bfr + ck->ck_stat_size, ck->compressed_size, stream);
write_tasks.push_back(
out_sink_->device_write_async(dev_bfr + ck->ck_stat_size, ck->compressed_size, stream));
// we still need to do a (much smaller) memcpy for the statistics.
if (ck->ck_stat_size != 0) {
md.row_groups[global_r].columns[i].meta_data.statistics_blob.resize(ck->ck_stat_size);
Expand Down Expand Up @@ -1438,6 +1440,9 @@ void writer::impl::write(table_view const& table)
current_chunk_offset += ck->compressed_size;
}
}
for (auto const& task : write_tasks) {
task.wait();
}
Comment on lines +1443 to +1445
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move this out of the loop so that we only wait after all encoding is done? Hard to tell if buffers are reused between batches.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hard to tell if buffers are reused between batches.

It is indeed.

}
}

Expand Down
30 changes: 29 additions & 1 deletion cpp/src/io/utilities/data_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,17 @@ class file_sink : public data_sink {
_bytes_written += size;
}

std::future<void> device_write_async(void const* gpu_data,
size_t size,
rmm::cuda_stream_view stream) override
{
if (!supports_device_write()) CUDF_FAIL("Device writes are not supported for this file.");

auto result = _cufile_out->write_async(gpu_data, _bytes_written, size);
_bytes_written += size;
return result;
bdice marked this conversation as resolved.
Show resolved Hide resolved
}

private:
std::ofstream _output_stream;
size_t _bytes_written = 0;
Expand Down Expand Up @@ -111,6 +122,14 @@ class void_sink : public data_sink {
_bytes_written += size;
}

std::future<void> device_write_async(void const* gpu_data,
size_t size,
rmm::cuda_stream_view stream) override
{
_bytes_written += size;
return std::async(std::launch::deferred, [] {});
}

void flush() override {}

size_t bytes_written() override { return _bytes_written; }
Expand All @@ -132,10 +151,19 @@ class user_sink_wrapper : public data_sink {
void device_write(void const* gpu_data, size_t size, rmm::cuda_stream_view stream) override
{
CUDF_EXPECTS(user_sink->supports_device_write(),
"device_write() being called on a data_sink that doesn't support it");
"device_write() was called on a data_sink that doesn't support it");
user_sink->device_write(gpu_data, size, stream);
}

std::future<void> device_write_async(void const* gpu_data,
size_t size,
rmm::cuda_stream_view stream) override
{
CUDF_EXPECTS(user_sink->supports_device_write(),
"device_write_async() was called on a data_sink that doesn't support it");
return user_sink->device_write_async(gpu_data, size, stream);
}

void flush() override { user_sink->flush(); }

size_t bytes_written() override { return user_sink->bytes_written(); }
Expand Down
81 changes: 62 additions & 19 deletions cpp/src/io/utilities/file_io_utilities.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,31 @@ std::unique_ptr<datasource::buffer> cufile_input_impl::read(size_t offset,
return datasource::buffer::create(std::move(out_data));
}

namespace {

template <typename DataT,
typename F,
typename ResultT = std::invoke_result_t<F, DataT*, size_t, size_t>>
std::vector<std::future<ResultT>> make_sliced_tasks(
F function, DataT* ptr, size_t offset, size_t size, cudf::detail::thread_pool& pool)
{
std::vector<std::future<ResultT>> slice_tasks;
constexpr size_t max_slice_bytes = 4 * 1024 * 1024;
size_t const n_slices = util::div_rounding_up_safe(size, max_slice_bytes);
size_t slice_offset = 0;
for (size_t t = 0; t < n_slices; ++t) {
DataT* ptr_slice = ptr + slice_offset;

size_t const slice_size = (t == n_slices - 1) ? size % max_slice_bytes : max_slice_bytes;
slice_tasks.push_back(pool.submit(function, ptr_slice, slice_size, offset + slice_offset));

slice_offset += slice_size;
}
return slice_tasks;
}

} // namespace

std::future<size_t> cufile_input_impl::read_async(size_t offset,
size_t size,
uint8_t* dst,
Expand All @@ -193,32 +218,22 @@ std::future<size_t> cufile_input_impl::read_async(size_t offset,
int device;
cudaGetDevice(&device);

auto read_slice = [=](void* dst, size_t size, size_t offset) -> ssize_t {
auto read_slice = [device, gds_read = shim->read, file_handle = cf_file.handle()](
void* dst, size_t size, size_t offset) -> ssize_t {
cudaSetDevice(device);
auto read_size = shim->read(cf_file.handle(), dst, size, offset, 0);
auto read_size = gds_read(file_handle, dst, size, offset, 0);
CUDF_EXPECTS(read_size != -1, "cuFile error reading from a file");
return read_size;
};

std::vector<std::future<ssize_t>> slice_tasks;
constexpr size_t max_slice_bytes = 4 * 1024 * 1024;
size_t n_slices = util::div_rounding_up_safe(size, max_slice_bytes);
size_t slice_size = max_slice_bytes;
size_t slice_offset = 0;
for (size_t t = 0; t < n_slices; ++t) {
void* dst_slice = dst + slice_offset;

if (t == n_slices - 1) { slice_size = size % max_slice_bytes; }
slice_tasks.push_back(pool.submit(read_slice, dst_slice, slice_size, offset + slice_offset));
auto slice_tasks = make_sliced_tasks(read_slice, dst, offset, size, pool);

slice_offset += slice_size;
}
auto waiter = [](decltype(slice_tasks) slice_tasks) -> size_t {
auto waiter = [](auto slice_tasks) -> size_t {
return std::accumulate(slice_tasks.begin(), slice_tasks.end(), 0, [](auto sum, auto& task) {
return sum + task.get();
});
};
// The future returned from this function is deferred, not async becasue we want to avoid creating
// The future returned from this function is deferred, not async because we want to avoid creating
// threads for each read_async call. This overhead is significant in case of multiple small reads.
return std::async(std::launch::deferred, waiter, std::move(slice_tasks));
}
Expand All @@ -233,14 +248,42 @@ size_t cufile_input_impl::read(size_t offset,
}

cufile_output_impl::cufile_output_impl(std::string const& filepath)
: shim{cufile_shim::instance()}, cf_file(shim, filepath, O_CREAT | O_RDWR | O_DIRECT, 0664)
: shim{cufile_shim::instance()},
cf_file(shim, filepath, O_CREAT | O_RDWR | O_DIRECT, 0664),
pool(16)
{
}

void cufile_output_impl::write(void const* data, size_t offset, size_t size)
{
CUDF_EXPECTS(shim->write(cf_file.handle(), data, size, offset, 0) != -1,
"cuFile error writing to a file");
write_async(data, offset, size).wait();
}

std::future<void> cufile_output_impl::write_async(void const* data, size_t offset, size_t size)
{
int device;
cudaGetDevice(&device);

auto write_slice = [device, gds_write = shim->write, file_handle = cf_file.handle()](
void const* src, size_t size, size_t offset) -> void {
cudaSetDevice(device);
auto write_size = gds_write(file_handle, src, size, offset, 0);
CUDF_EXPECTS(write_size != -1 and write_size == static_cast<decltype(write_size)>(size),
"cuFile error writing to a file");
};

auto source = static_cast<uint8_t const*>(data);
auto slice_tasks = make_sliced_tasks(write_slice, source, offset, size, pool);

auto waiter = [](auto slice_tasks) -> void {
for (auto const& task : slice_tasks) {
task.wait();
}
};
// The future returned from this function is deferred, not async because we want to avoid creating
// threads for each write_async call. This overhead is significant in case of multiple small
// writes.
return std::async(std::launch::deferred, waiter, std::move(slice_tasks));
}
#endif

Expand Down
Loading