Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cuio: reduce/improve kernel parms: avro #6399

Closed
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@
- PR #6375 Parallelize Cython compilation in addition to Cythonization
- PR #6303 Use cudf test dtypes so timedelta tests are determinstic
- PR #6326 Simplify interal csv/json kernel parameters
- PR #6399 Simplify interal avro kernel parameters
- PR #6308 Add dictionary support to cudf::scatter with scalar
- PR #6312 Conda recipe dependency cleanup
- PR #6347 Add dictionary support to cudf::copy_range
Expand Down
4 changes: 2 additions & 2 deletions cpp/include/cudf/io/avro.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@

#include "types.hpp"

#include <rmm/mr/device/per_device_resource.hpp>

#include <cudf/table/table_view.hpp>
#include <cudf/types.hpp>

#include <rmm/mr/device/per_device_resource.hpp>

#include <memory>
#include <string>
#include <vector>
Expand Down
52 changes: 26 additions & 26 deletions cpp/src/io/avro/avro.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ namespace avro {
*
* @returns true if successful, false if error
*/
bool container::parse(file_metadata *md, size_t max_num_rows, size_t first_row)
bool container::parse(file_metadata &md, size_t max_num_rows, size_t first_row)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cudf convention is to take out params by pointer so as to indicate at call site that this will be modified.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. I just checked to see if we can turn this in to something along the lines of std::pair<bool, file_metadata> parse(int, int);, as the invocation site suggests it can, but it requires combining the reader impl constructor+read functions to make it pretty. Saving for another time, and reverting to file_metadata *md

{
uint32_t sig4, max_block_size;
size_t total_object_count;
Expand All @@ -47,19 +47,19 @@ bool container::parse(file_metadata *md, size_t max_num_rows, size_t first_row)
std::string key = get_str();
std::string value = get_str();
if (key == "avro.codec") {
md->codec = value;
md.codec = value;
} else if (key == "avro.schema") {
schema_parser sp;
if (!sp.parse(md->schema, value)) { return false; }
if (!sp.parse(md.schema, value)) { return false; }
} else {
// printf("\"%s\" = \"%s\"\n", key.c_str(), value.c_str());
md->user_data.emplace(key, value);
md.user_data.emplace(key, value);
}
}
}
for (int i = 0; i < 16; i++) { (reinterpret_cast<uint8_t *>(&md->sync_marker[0]))[i] = getb(); }
md->metadata_size = m_cur - m_base;
md->skip_rows = 0;
for (int i = 0; i < 16; i++) { (reinterpret_cast<uint8_t *>(&md.sync_marker[0]))[i] = getb(); }
md.metadata_size = m_cur - m_base;
md.skip_rows = 0;
max_block_size = 0;
total_object_count = 0;
while (m_cur + 18 < m_end && total_object_count < max_num_rows) {
Expand All @@ -70,61 +70,61 @@ bool container::parse(file_metadata *md, size_t max_num_rows, size_t first_row)
uint32_t block_row = static_cast<uint32_t>(total_object_count);
max_block_size = std::max(max_block_size, block_size);
total_object_count += object_count;
if (!md->block_list.size()) {
md->skip_rows = static_cast<uint32_t>(first_row);
if (!md.block_list.size()) {
md.skip_rows = static_cast<uint32_t>(first_row);
total_object_count -= first_row;
first_row = 0;
}
md->block_list.emplace_back(m_cur - m_base, block_size, block_row, object_count);
md.block_list.emplace_back(m_cur - m_base, block_size, block_row, object_count);
} else {
first_row -= object_count;
}
m_cur += block_size;
m_cur += 16; // TODO: Validate sync marker
}
md->max_block_size = max_block_size;
md->num_rows = total_object_count;
md->total_data_size = m_cur - (m_base + md->metadata_size);
md.max_block_size = max_block_size;
md.num_rows = total_object_count;
md.total_data_size = m_cur - (m_base + md.metadata_size);
// Extract columns
for (size_t i = 0; i < md->schema.size(); i++) {
type_kind_e kind = md->schema[i].kind;
for (size_t i = 0; i < md.schema.size(); i++) {
type_kind_e kind = md.schema[i].kind;
if (kind > type_null && kind < type_record) {
// Primitive type column
column_desc col;
int parent_idx = md->schema[i].parent_idx;
int parent_idx = md.schema[i].parent_idx;
col.schema_data_idx = (int32_t)i;
col.schema_null_idx = -1;
col.parent_union_idx = -1;
col.name = md->schema[i].name;
col.name = md.schema[i].name;
if (parent_idx >= 0) {
while (parent_idx >= 0) {
if (md->schema[parent_idx].kind == type_union) {
if (md.schema[parent_idx].kind == type_union) {
int pos = parent_idx + 1;
for (int num_children = md->schema[parent_idx].num_children; num_children > 0;
for (int num_children = md.schema[parent_idx].num_children; num_children > 0;
--num_children) {
int skip = 1;
if (pos == i) {
col.parent_union_idx = md->schema[parent_idx].num_children - num_children;
} else if (md->schema[pos].kind == type_null) {
col.parent_union_idx = md.schema[parent_idx].num_children - num_children;
} else if (md.schema[pos].kind == type_null) {
col.schema_null_idx = pos;
break;
}
do {
skip = skip + md->schema[pos].num_children - 1;
skip = skip + md.schema[pos].num_children - 1;
pos++;
} while (skip != 0);
}
}
// Ignore the root or array entries
if ((parent_idx != 0 && md->schema[parent_idx].kind != type_array) ||
if ((parent_idx != 0 && md.schema[parent_idx].kind != type_array) ||
col.name.length() == 0) {
if (col.name.length() > 0) { col.name.insert(0, 1, '.'); }
col.name.insert(0, md->schema[parent_idx].name);
col.name.insert(0, md.schema[parent_idx].name);
}
parent_idx = md->schema[parent_idx].parent_idx;
parent_idx = md.schema[parent_idx].parent_idx;
}
}
md->columns.emplace_back(std::move(col));
md.columns.emplace_back(std::move(col));
}
}
return true;
Expand Down
27 changes: 14 additions & 13 deletions cpp/src/io/avro/avro.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@

#pragma once

#include "avro_common.h"

#include <stddef.h>
#include <stdint.h>
#include <algorithm>
#include <map>
#include <string>
#include <vector>
#include "avro_common.h"

namespace cudf {
namespace io {
Expand Down Expand Up @@ -80,16 +81,16 @@ class schema_parser {

public:
schema_parser() {}
bool parse(std::vector<schema_entry> &schema, const std::string &str);
bool parse(std::vector<schema_entry> &schema, std::string const &str);

protected:
bool more_data() const { return (m_cur < m_end); }
std::string get_str();

protected:
const char *m_base;
const char *m_cur;
const char *m_end;
char const *m_base;
char const *m_cur;
char const *m_end;
};

/**
Expand All @@ -98,8 +99,8 @@ class schema_parser {
class container {
public:
container() { m_base = m_cur = m_end = nullptr; }
container(const uint8_t *base, size_t len) { init(base, len); }
void init(const uint8_t *base, size_t len)
container(uint8_t const *base, size_t len) { init(base, len); }
void init(uint8_t const *base, size_t len)
{
m_base = m_cur = base;
m_end = base + len;
Expand All @@ -122,21 +123,21 @@ class container {
}
std::string get_str()
{
const char *s;
char const *s;
size_t len = get_u64();
len = ((len & 1) || (m_cur >= m_end)) ? 0 : std::min(len >> 1, (size_t)(m_end - m_cur));
s = reinterpret_cast<const char *>(m_cur);
s = reinterpret_cast<char const *>(m_cur);
m_cur += len;
return std::string(s, len);
}

public:
bool parse(file_metadata *md, size_t max_num_rows = 0x7fffffff, size_t first_row = 0);
bool parse(file_metadata &md, size_t max_num_rows = 0x7fffffff, size_t first_row = 0);

protected:
const uint8_t *m_base;
const uint8_t *m_cur;
const uint8_t *m_end;
uint8_t const *m_base;
uint8_t const *m_cur;
uint8_t const *m_end;
};

} // namespace avro
Expand Down
3 changes: 2 additions & 1 deletion cpp/src/io/avro/avro_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#pragma once

#include <stdint.h>
#include <cstddef>

namespace cudf {
namespace io {
Expand All @@ -27,7 +28,7 @@ struct block_desc_s {
uint32_t size_,
uint32_t first_row_,
uint32_t num_rows_)
: offset(offset_), first_row(first_row_), num_rows(num_rows_), size(size_)
: offset(offset_), size(size_), first_row(first_row_), num_rows(num_rows_)
{
}

Expand Down
Loading