Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean up CUDA stream use in cuIO #9991

Merged
merged 3 commits into from
Jan 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions cpp/src/io/avro/avro_gpu.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2021, NVIDIA CORPORATION.
* Copyright (c) 2019-2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -47,17 +47,17 @@ struct schemadesc_s {
* @param[in] max_rows Maximum number of rows to load
* @param[in] first_row Crop all rows below first_row
* @param[in] min_row_size Minimum size in bytes of a row
* @param[in] stream CUDA stream to use, default 0
* @param[in] stream CUDA stream to use
*/
void DecodeAvroColumnData(cudf::device_span<block_desc_s const> blocks,
schemadesc_s* schema,
cudf::device_span<string_index_pair const> global_dictionary,
uint8_t const* avro_data,
uint32_t schema_len,
size_t max_rows = ~0,
size_t first_row = 0,
uint32_t min_row_size = 0,
rmm::cuda_stream_view stream = rmm::cuda_stream_default);
size_t max_rows,
size_t first_row,
uint32_t min_row_size,
rmm::cuda_stream_view stream);

} // namespace gpu
} // namespace avro
Expand Down
8 changes: 4 additions & 4 deletions cpp/src/io/avro/reader_impl.cu
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2021, NVIDIA CORPORATION.
* Copyright (c) 2020-2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -159,8 +159,8 @@ rmm::device_buffer decompress_data(datasource& source,
if (meta.codec == "deflate") {
size_t uncompressed_data_size = 0;

auto inflate_in = hostdevice_vector<gpu_inflate_input_s>(meta.block_list.size());
auto inflate_out = hostdevice_vector<gpu_inflate_status_s>(meta.block_list.size());
auto inflate_in = hostdevice_vector<gpu_inflate_input_s>(meta.block_list.size(), stream);
auto inflate_out = hostdevice_vector<gpu_inflate_status_s>(meta.block_list.size(), stream);

// Guess an initial maximum uncompressed block size
uint32_t initial_blk_len = (meta.max_block_size * 2 + 0xfff) & ~0xfff;
Expand Down Expand Up @@ -343,7 +343,7 @@ std::vector<column_buffer> decode_data(metadata& meta,
}

// Build gpu schema
auto schema_desc = hostdevice_vector<gpu::schemadesc_s>(meta.schema.size());
auto schema_desc = hostdevice_vector<gpu::schemadesc_s>(meta.schema.size(), stream);

uint32_t min_row_data_size = 0;
int skip_field_cnt = 0;
Expand Down
46 changes: 23 additions & 23 deletions cpp/src/io/comp/gpuinflate.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2018-2020, NVIDIA CORPORATION.
* Copyright (c) 2018-2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -49,26 +49,26 @@ struct gpu_inflate_status_s {
*
* @param[in] inputs List of input argument structures
* @param[out] outputs List of output status structures
* @param[in] count Number of input/output structures, default 1
* @param[in] parse_hdr Whether or not to parse GZIP header, default false
* @param[in] stream CUDA stream to use, default 0
* @param[in] count Number of input/output structures
* @param[in] parse_hdr Whether or not to parse GZIP header
* @param[in] stream CUDA stream to use
*/
cudaError_t gpuinflate(gpu_inflate_input_s* inputs,
gpu_inflate_status_s* outputs,
int count = 1,
int parse_hdr = 0,
rmm::cuda_stream_view stream = rmm::cuda_stream_default);
int count,
int parse_hdr,
rmm::cuda_stream_view stream);

/**
* @brief Interface for copying uncompressed byte blocks
*
* @param[in] inputs List of input argument structures
* @param[in] count Number of input structures, default 1
* @param[in] stream CUDA stream to use, default 0
* @param[in] count Number of input structures
* @param[in] stream CUDA stream to use
*/
cudaError_t gpu_copy_uncompressed_blocks(gpu_inflate_input_s* inputs,
int count = 1,
rmm::cuda_stream_view stream = rmm::cuda_stream_default);
int count,
rmm::cuda_stream_view stream);

/**
* @brief Interface for decompressing Snappy-compressed data
Expand All @@ -78,13 +78,13 @@ cudaError_t gpu_copy_uncompressed_blocks(gpu_inflate_input_s* inputs,
*
* @param[in] inputs List of input argument structures
* @param[out] outputs List of output status structures
* @param[in] count Number of input/output structures, default 1
* @param[in] stream CUDA stream to use, default 0
* @param[in] count Number of input/output structures
* @param[in] stream CUDA stream to use
*/
cudaError_t gpu_unsnap(gpu_inflate_input_s* inputs,
gpu_inflate_status_s* outputs,
int count = 1,
rmm::cuda_stream_view stream = rmm::cuda_stream_default);
int count,
rmm::cuda_stream_view stream);

/**
* @brief Computes the size of temporary memory for Brotli decompression
Expand All @@ -105,15 +105,15 @@ size_t get_gpu_debrotli_scratch_size(int max_num_inputs = 0);
* @param[out] outputs List of output status structures
* @param[in] scratch Temporary memory for intermediate work
* @param[in] scratch_size Size in bytes of the temporary memory
* @param[in] count Number of input/output structures, default 1
* @param[in] stream CUDA stream to use, default 0
* @param[in] count Number of input/output structures
* @param[in] stream CUDA stream to use
*/
cudaError_t gpu_debrotli(gpu_inflate_input_s* inputs,
gpu_inflate_status_s* outputs,
void* scratch,
size_t scratch_size,
int count = 1,
rmm::cuda_stream_view stream = rmm::cuda_stream_default);
int count,
rmm::cuda_stream_view stream);

/**
* @brief Interface for compressing data with Snappy
Expand All @@ -123,13 +123,13 @@ cudaError_t gpu_debrotli(gpu_inflate_input_s* inputs,
*
* @param[in] inputs List of input argument structures
* @param[out] outputs List of output status structures
* @param[in] count Number of input/output structures, default 1
* @param[in] stream CUDA stream to use, default 0
* @param[in] count Number of input/output structures
* @param[in] stream CUDA stream to use
*/
cudaError_t gpu_snap(gpu_inflate_input_s* inputs,
gpu_inflate_status_s* outputs,
int count = 1,
rmm::cuda_stream_view stream = rmm::cuda_stream_default);
int count,
rmm::cuda_stream_view stream);

} // namespace io
} // namespace cudf
4 changes: 2 additions & 2 deletions cpp/src/io/csv/reader_impl.cu
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2021, NVIDIA CORPORATION.
* Copyright (c) 2019-2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -237,7 +237,7 @@ std::pair<rmm::device_uvector<char>, selected_rows_offsets> load_data_and_gather
size_t buffer_size = std::min(max_chunk_bytes, data.size());
size_t max_blocks =
std::max<size_t>((buffer_size / cudf::io::csv::gpu::rowofs_block_bytes) + 1, 2);
hostdevice_vector<uint64_t> row_ctx(max_blocks);
hostdevice_vector<uint64_t> row_ctx(max_blocks, stream);
size_t buffer_pos = std::min(range_begin - std::min(range_begin, sizeof(char)), data.size());
size_t pos = std::min(range_begin, data.size());
size_t header_rows = (reader_opts.get_header() >= 0) ? reader_opts.get_header() + 1 : 0;
Expand Down
9 changes: 4 additions & 5 deletions cpp/src/io/csv/writer_impl.cu
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2021, NVIDIA CORPORATION.
* Copyright (c) 2020-2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -137,10 +137,9 @@ struct column_to_strings_fn {
(cudf::is_timestamp<column_type>()) || (cudf::is_duration<column_type>()));
}

explicit column_to_strings_fn(
csv_writer_options const& options,
rmm::cuda_stream_view stream = rmm::cuda_stream_default,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource())
explicit column_to_strings_fn(csv_writer_options const& options,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
: options_(options), stream_(stream), mr_(mr)
{
}
Expand Down
7 changes: 5 additions & 2 deletions cpp/src/io/orc/timezone.cuh
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2021, NVIDIA CORPORATION.
* Copyright (c) 2020-2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -107,10 +107,13 @@ inline __device__ int32_t get_gmt_offset(cudf::device_span<int64_t const> ttimes
return get_gmt_offset_impl(ttimes.begin(), offsets.begin(), ttimes.size(), ts);
}

struct timezone_table {
class timezone_table {
int32_t gmt_offset = 0;
rmm::device_uvector<int64_t> ttimes;
rmm::device_uvector<int32_t> offsets;

public:
// Safe to use the default stream, device_uvectors will not change after they are created empty
timezone_table() : ttimes{0, rmm::cuda_stream_default}, offsets{0, rmm::cuda_stream_default} {}
timezone_table(int32_t gmt_offset,
rmm::device_uvector<int64_t>&& ttimes,
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/io/orc/writer_impl.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2021, NVIDIA CORPORATION.
* Copyright (c) 2019-2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -349,7 +349,7 @@ class writer::impl {
private:
rmm::mr::device_memory_resource* _mr = nullptr;
// Cuda stream to be used
rmm::cuda_stream_view stream = rmm::cuda_stream_default;
rmm::cuda_stream_view stream;

stripe_size_limits max_stripe_size;
size_type row_index_stride;
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/io/parquet/parquet_gpu.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2018-2021, NVIDIA CORPORATION.
* Copyright (c) 2018-2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -479,7 +479,7 @@ struct dremel_data {
dremel_data get_dremel_data(column_view h_col,
rmm::device_uvector<uint8_t> const& d_nullability,
std::vector<uint8_t> const& nullability,
rmm::cuda_stream_view stream = rmm::cuda_stream_default);
rmm::cuda_stream_view stream);

/**
* @brief Launches kernel for initializing encoder page fragments
Expand Down
6 changes: 3 additions & 3 deletions cpp/src/io/parquet/reader_impl.cu
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2021, NVIDIA CORPORATION.
* Copyright (c) 2019-2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -1427,8 +1427,8 @@ void reader::impl::decode_page_data(hostdevice_vector<gpu::ColumnChunkDesc>& chu
// In order to reduce the number of allocations of hostdevice_vector, we allocate a single vector
// to store all per-chunk pointers to nested data/nullmask. `chunk_offsets[i]` will store the
// offset into `chunk_nested_data`/`chunk_nested_valids` for the array of pointers for chunk `i`
auto chunk_nested_valids = hostdevice_vector<uint32_t*>(sum_max_depths);
auto chunk_nested_data = hostdevice_vector<void*>(sum_max_depths);
auto chunk_nested_valids = hostdevice_vector<uint32_t*>(sum_max_depths, stream);
auto chunk_nested_data = hostdevice_vector<void*>(sum_max_depths, stream);
auto chunk_offsets = std::vector<size_t>();

// Update chunks with pointers to column data.
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/io/parquet/writer_impl.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2021, NVIDIA CORPORATION.
* Copyright (c) 2019-2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -206,7 +206,7 @@ class writer::impl {
// TODO : figure out if we want to keep this. It is currently unused.
rmm::mr::device_memory_resource* _mr = nullptr;
// Cuda stream to be used
rmm::cuda_stream_view stream = rmm::cuda_stream_default;
rmm::cuda_stream_view stream;

size_t max_row_group_size = default_row_group_size_bytes;
size_type max_row_group_rows = default_row_group_size_rows;
Expand Down
39 changes: 17 additions & 22 deletions cpp/src/io/utilities/column_buffer.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2021, NVIDIA CORPORATION.
* Copyright (c) 2019-2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -45,11 +45,10 @@ namespace detail {
*
* @return `rmm::device_buffer` Device buffer allocation
*/
inline rmm::device_buffer create_data(
data_type type,
size_type size,
rmm::cuda_stream_view stream = rmm::cuda_stream_default,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource())
inline rmm::device_buffer create_data(data_type type,
size_type size,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
std::size_t data_size = size_of(type) * size;

Expand All @@ -75,9 +74,9 @@ struct column_buffer {
// construct with a known size. allocates memory
column_buffer(data_type _type,
size_type _size,
bool _is_nullable = true,
rmm::cuda_stream_view stream = rmm::cuda_stream_default,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource())
bool _is_nullable,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
: type(_type), is_nullable(_is_nullable)
{
create(_size, stream, mr);
Expand All @@ -93,9 +92,7 @@ struct column_buffer {

// instantiate a column of known type with a specified size. Allows deferred creation for
// preprocessing steps such as in the Parquet reader
void create(size_type _size,
rmm::cuda_stream_view stream = rmm::cuda_stream_default,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());
void create(size_type _size, rmm::cuda_stream_view stream, rmm::mr::device_memory_resource* mr);

auto data() { return _strings ? _strings->data() : _data.data(); }
auto data_size() const { return _strings ? _strings->size() : _data.size(); }
Expand Down Expand Up @@ -134,11 +131,10 @@ struct column_buffer {
*
* @return `std::unique_ptr<cudf::column>` Column from the existing device data
*/
std::unique_ptr<column> make_column(
column_buffer& buffer,
column_name_info* schema_info = nullptr,
rmm::cuda_stream_view stream = rmm::cuda_stream_default,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());
std::unique_ptr<column> make_column(column_buffer& buffer,
column_name_info* schema_info,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr);

/**
* @brief Creates an equivalent empty column from an existing set of device memory buffers.
Expand All @@ -155,11 +151,10 @@ std::unique_ptr<column> make_column(
*
* @return `std::unique_ptr<cudf::column>` Column from the existing device data
*/
std::unique_ptr<column> empty_like(
column_buffer& buffer,
column_name_info* schema_info = nullptr,
rmm::cuda_stream_view stream = rmm::cuda_stream_default,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());
std::unique_ptr<column> empty_like(column_buffer& buffer,
column_name_info* schema_info,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr);

} // namespace detail
} // namespace io
Expand Down
13 changes: 4 additions & 9 deletions cpp/src/io/utilities/hostdevice_vector.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2020, NVIDIA CORPORATION.
* Copyright (c) 2019-2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -45,15 +45,12 @@ class hostdevice_vector {
return *this;
}

explicit hostdevice_vector(size_t max_size,
rmm::cuda_stream_view stream = rmm::cuda_stream_default)
explicit hostdevice_vector(size_t max_size, rmm::cuda_stream_view stream)
: hostdevice_vector(max_size, max_size, stream)
{
}

explicit hostdevice_vector(size_t initial_size,
size_t max_size,
rmm::cuda_stream_view stream = rmm::cuda_stream_default)
explicit hostdevice_vector(size_t initial_size, size_t max_size, rmm::cuda_stream_view stream)
: num_elements(initial_size), max_elements(max_size)
{
if (max_elements != 0) {
Expand Down Expand Up @@ -148,9 +145,7 @@ namespace detail {
template <typename T>
class hostdevice_2dvector {
public:
hostdevice_2dvector(size_t rows,
size_t columns,
rmm::cuda_stream_view stream = rmm::cuda_stream_default)
hostdevice_2dvector(size_t rows, size_t columns, rmm::cuda_stream_view stream)
: _size{rows, columns}, _data{rows * columns, stream}
{
}
Expand Down
Loading