Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cuio: reduce/improve kernel parms: avro #6399

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,7 @@
- PR #6375 Parallelize Cython compilation in addition to Cythonization
- PR #6303 Use cudf test dtypes so timedelta tests are deterministic
- PR #6326 Simplify interal csv/json kernel parameters
- PR #6399 Simplify interal avro kernel parameters
- PR #6308 Add dictionary support to cudf::scatter with scalar
- PR #6367 Add JNI bindings for byte casting
- PR #6312 Conda recipe dependency cleanup
Expand Down
14 changes: 7 additions & 7 deletions cpp/src/io/avro/avro.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,16 +82,16 @@ class schema_parser {

public:
schema_parser() {}
bool parse(std::vector<schema_entry> &schema, const std::string &str);
bool parse(std::vector<schema_entry> &schema, std::string const &str);

protected:
bool more_data() const { return (m_cur < m_end); }
std::string get_str();

protected:
const char *m_base;
const char *m_cur;
const char *m_end;
char const *m_base;
char const *m_cur;
char const *m_end;
};

/**
Expand Down Expand Up @@ -122,9 +122,9 @@ class container {
bool parse(file_metadata *md, size_t max_num_rows = 0x7fffffff, size_t first_row = 0);

protected:
const uint8_t *m_base;
const uint8_t *m_cur;
const uint8_t *m_end;
uint8_t const *m_base;
uint8_t const *m_cur;
uint8_t const *m_end;
};

} // namespace avro
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/io/avro/avro_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
#pragma once

#include <stdint.h>
#include <stdio.h>
#include <cstddef>

namespace cudf {
namespace io {
Expand All @@ -28,7 +28,7 @@ struct block_desc_s {
uint32_t size_,
uint32_t first_row_,
uint32_t num_rows_)
: offset(offset_), first_row(first_row_), num_rows(num_rows_), size(size_)
: offset(offset_), size(size_), first_row(first_row_), num_rows(num_rows_)
{
}

Expand Down
118 changes: 53 additions & 65 deletions cpp/src/io/avro/avro_gpu.cu
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "avro_gpu.h"

#include <io/utilities/block_utils.cuh>

#include <cudf/utilities/span.hpp>

#include <rmm/cuda_stream_view.hpp>

using cudf::detail::device_span;
Expand Down Expand Up @@ -56,28 +59,26 @@ static inline int64_t __device__ avro_decode_zigzag_varint(const uint8_t *&cur,
*
* @param[in] schema Schema description
* @param[in] schema_g Global schema in device mem
* @param[in] schema_len Number of schema entries
* @param[in] row Current row
* @param[in] max_rows Total number of rows
* @param[in] cur Current input data pointer
* @param[in] end End of input data
* @param[in] global_Dictionary Global dictionary entries
* @param[in] global_dictionary Global dictionary entries
*
* @return data pointer at the end of the row (start of next row)
cwharris marked this conversation as resolved.
Show resolved Hide resolved
*
**/
static const uint8_t *__device__ avro_decode_row(const schemadesc_s *schema,
schemadesc_s *schema_g,
uint32_t schema_len,
static const uint8_t *__device__ avro_decode_row(device_span<schemadesc_s const> schema,
device_span<schemadesc_s> schema_g,
size_t row,
size_t max_rows,
const uint8_t *cur,
const uint8_t *end,
device_span<nvstrdesc_s> global_dictionary)
uint8_t const *cur,
uint8_t const *end,
device_span<nvstrdesc_s const> global_dictionary)
{
uint32_t array_start = 0, array_repeat_count = 0;
int array_children = 0;
for (uint32_t i = 0; i < schema_len;) {
for (uint32_t i = 0; i < schema.size();) {
uint32_t kind = schema[i].kind;
int skip = 0;

Expand All @@ -87,12 +88,12 @@ static const uint8_t *__device__ avro_decode_row(const schemadesc_s *schema,
skip = (*cur++) >> 1; // NOTE: Assumes 1-byte union member
skip_after = schema[i].count - skip - 1;
++i;
while (skip > 0 && i < schema_len) {
while (skip > 0 && i < schema.size()) {
if (schema[i].kind >= type_record) { skip += schema[i].count; }
++i;
--skip;
}
if (i >= schema_len || skip_after < 0) break;
if (i >= schema.size() || skip_after < 0) break;
kind = schema[i].kind;
skip = skip_after;
}
Expand Down Expand Up @@ -196,7 +197,7 @@ static const uint8_t *__device__ avro_decode_row(const schemadesc_s *schema,
if (schema[i].kind >= type_record) { array_children += schema[i].count; }
}
i++;
while (skip > 0 && i < schema_len) {
while (skip > 0 && i < schema.size()) {
if (schema[i].kind >= type_record) { skip += schema[i].count; }
++i;
--skip;
Expand All @@ -218,54 +219,59 @@ static const uint8_t *__device__ avro_decode_row(const schemadesc_s *schema,
* @brief Decode column data
*
* @param[in] blocks Data block descriptions
* @param[in] schema Schema description
* @param[in] global_Dictionary Global dictionary entries
* @param[in] avro_data Raw block data
* @param[in] num_blocks Number of blocks
* @param[in] schema_len Number of entries in schema
* @param[in] schema Schema description
* @param[in] min_row_size Minimum size in bytes of a row
* @param[in] max_rows Maximum number of rows to load
* @param[in] first_row Crop all rows below first_row
*
**/
// blockDim {32,num_warps,1}
extern "C" __global__ void __launch_bounds__(num_warps * 32, 2)
gpuDecodeAvroColumnData(block_desc_s *blocks,
schemadesc_s *schema_g,
device_span<nvstrdesc_s> global_dictionary,
const uint8_t *avro_data,
uint32_t num_blocks,
uint32_t schema_len,
uint32_t min_row_size,
size_t max_rows,
size_t first_row)
decode_avro_column_data_kernel(device_span<block_desc_s const> blocks,
device_span<nvstrdesc_s const> global_dictionary,
device_span<uint8_t const> avro_data,
device_span<schemadesc_s> schema_g,
uint32_t min_row_size,
size_t max_rows,
size_t first_row)
{
__shared__ __align__(8) schemadesc_s g_shared_schema[max_shared_schema_len];
__shared__ __align__(8) block_desc_s blk_g[num_warps];

schemadesc_s *schema;
uint32_t num_blocks = blocks.size();
uint32_t num_dictionary_entries = global_dictionary.size();

device_span<schemadesc_s const> schema;
block_desc_s *const blk = &blk_g[threadIdx.y];
uint32_t block_id = blockIdx.x * num_warps + threadIdx.y;
size_t cur_row;
uint32_t rows_remaining;
const uint8_t *cur, *end;
uint8_t const *cur;
uint8_t const *end;

// Fetch schema into shared mem if possible
if (schema_len <= max_shared_schema_len) {
for (int i = threadIdx.y * 32 + threadIdx.x; i < schema_len; i += num_warps * 32) {
if (schema_g.size() <= max_shared_schema_len) {
for (int i = threadIdx.y * 32 + threadIdx.x;
i < schema_g.size() * sizeof(schemadesc_s) / sizeof(uint32_t);
i += num_warps * 32) {
g_shared_schema[i] = schema_g[i];
}
__syncthreads();
schema = g_shared_schema;
schema = device_span<schemadesc_s const>(g_shared_schema, schema_g.size());
} else {
schema = schema_g;
schema = device_span<schemadesc_s const>(schema_g.data(), schema_g.size());
}
if (block_id < num_blocks && threadIdx.x < sizeof(block_desc_s) / sizeof(uint32_t)) {
blk[threadIdx.x] = reinterpret_cast<block_desc_s const *>(&blocks[block_id])[threadIdx.x];
__threadfence_block();
}
if (block_id < num_blocks and threadIdx.x == 0) { *blk = blocks[block_id]; }
__syncthreads();
if (block_id >= num_blocks) { return; }
cur_row = blk->first_row;
rows_remaining = blk->num_rows;
cur = avro_data + blk->offset;
cur = avro_data.data() + blk->offset;
end = cur + blk->size;
while (rows_remaining > 0 && cur < end) {
uint32_t nrows;
Expand All @@ -279,14 +285,8 @@ extern "C" __global__ void __launch_bounds__(num_warps * 32, 2)
nrows = 1;
}
if (threadIdx.x < nrows) {
cur = avro_decode_row(schema,
schema_g,
schema_len,
cur_row - first_row + threadIdx.x,
max_rows,
cur,
end,
global_dictionary);
cur = avro_decode_row(
schema, schema_g, cur_row - first_row + threadIdx.x, max_rows, cur, end, global_dictionary);
}
if (nrows <= 1) {
cur = start + shuffle(static_cast<uint32_t>(cur - start));
Expand All @@ -303,41 +303,29 @@ extern "C" __global__ void __launch_bounds__(num_warps * 32, 2)
* @brief Launches kernel for decoding column data
*
* @param[in] blocks Data block descriptions
* @param[in] schema Schema description
* @param[in] global_dictionary Global dictionary entries
* @param[in] avro_data Raw block data
* @param[in] num_blocks Number of blocks
* @param[in] schema_len Number of entries in schema
* @param[in] schema Schema description
* @param[in] max_rows Maximum number of rows to load
* @param[in] first_row Crop all rows below first_row
* @param[in] min_row_size Minimum size in bytes of a row
* @param[in] stream CUDA stream to use, default 0
**/
void DecodeAvroColumnData(block_desc_s *blocks,
schemadesc_s *schema,
device_span<nvstrdesc_s> global_dictionary,
const uint8_t *avro_data,
uint32_t num_blocks,
uint32_t schema_len,
size_t max_rows,
size_t first_row,
uint32_t min_row_size,
rmm::cuda_stream_view stream)
void __host__ decode_avro_column_data(device_span<block_desc_s const> const blocks,
device_span<nvstrdesc_s const> const global_dictionary,
device_span<uint8_t const> const avro_data,
device_span<schemadesc_s> const schema,
size_t max_rows,
size_t first_row,
uint32_t min_row_size,
rmm::cuda_stream_view stream)
{
// num_warps warps per threadblock
dim3 const dim_block(32, num_warps);
dim3 dim_block(32, num_warps);
// 1 warp per datablock, num_warps datablocks per threadblock
dim3 const dim_grid((num_blocks + num_warps - 1) / num_warps, 1);

gpuDecodeAvroColumnData<<<dim_grid, dim_block, 0, stream.value()>>>(blocks,
schema,
global_dictionary,
avro_data,
num_blocks,
schema_len,
min_row_size,
max_rows,
first_row);
dim3 dim_grid((blocks.size() + num_warps - 1) / num_warps, 1);
decode_avro_column_data_kernel<<<dim_grid, dim_block, 0, stream.value()>>>(
blocks, global_dictionary, avro_data, schema, min_row_size, max_rows, first_row);
}

} // namespace gpu
Expand Down
31 changes: 14 additions & 17 deletions cpp/src/io/avro/avro_gpu.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include <cudf/utilities/span.hpp>

using cudf::detail::device_span;
#include <rmm/cuda_stream_view.hpp>

namespace cudf {
Expand All @@ -27,15 +28,15 @@ namespace avro {
namespace gpu {
/**
* @brief Struct to describe the output of a string datatype
*/
**/
struct nvstrdesc_s {
const char *ptr;
char const *ptr;
size_t count;
};

/**
* @brief Struct to describe the avro schema
*/
**/
struct schemadesc_s {
uint32_t kind; // avro type kind
uint32_t count; // for records/unions: number of following child columns, for nulls: global
Expand All @@ -47,26 +48,22 @@ struct schemadesc_s {
* @brief Launches kernel for decoding column data
*
* @param[in] blocks Data block descriptions
* @param[in] schema Schema description
* @param[in] global_dictionary Global dictionary entries
* @param[in] avro_data Raw block data
* @param[in] num_blocks Number of blocks
* @param[in] schema_len Number of entries in schema
* @param[in] schema Schema description
* @param[in] max_rows Maximum number of rows to load
* @param[in] first_row Crop all rows below first_row
* @param[in] min_row_size Minimum size in bytes of a row
* @param[in] stream CUDA stream to use, default 0
*/
void DecodeAvroColumnData(block_desc_s *blocks,
schemadesc_s *schema,
cudf::detail::device_span<nvstrdesc_s> global_dictionary,
const uint8_t *avro_data,
uint32_t num_blocks,
uint32_t schema_len,
size_t max_rows = ~0,
size_t first_row = 0,
uint32_t min_row_size = 0,
rmm::cuda_stream_view stream = rmm::cuda_stream_default);
**/
void decode_avro_column_data(device_span<block_desc_s const> blocks,
device_span<nvstrdesc_s const> global_dictionary,
device_span<uint8_t const> avro_data,
device_span<schemadesc_s> schema,
size_t max_rows,
size_t first_row,
uint32_t min_row_size,
rmm::cuda_stream_view stream);

} // namespace gpu
} // namespace avro
Expand Down
Loading