Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add one-level list encoding support in parquet reader #9848

Merged
merged 14 commits into from
Dec 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions cpp/include/cudf/io/detail/parquet.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,10 @@ class reader {
*
* @param sources Input `datasource` objects to read the dataset from
* @param options Settings for controlling reading behavior
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource to use for device memory allocation
*/
explicit reader(std::vector<std::unique_ptr<cudf::io::datasource>>&& sources,
parquet_reader_options const& options,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr);

/**
Expand Down
2 changes: 1 addition & 1 deletion cpp/include/cudf/io/parquet.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ constexpr size_type default_row_group_size_rows = 1000000;
class parquet_reader_options_builder;

/**
* @brief Settings or `read_parquet()`.
* @brief Settings for `read_parquet()`.
*/
class parquet_reader_options {
source_info _source;
Expand Down
3 changes: 1 addition & 2 deletions cpp/src/io/functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -402,8 +402,7 @@ table_with_metadata read_parquet(parquet_reader_options const& options,
CUDF_FUNC_RANGE();

auto datasources = make_datasources(options.get_source());
auto reader = std::make_unique<detail_parquet::reader>(
std::move(datasources), options, rmm::cuda_stream_default, mr);
auto reader = std::make_unique<detail_parquet::reader>(std::move(datasources), options, mr);

return reader->read(options);
}
Expand Down
8 changes: 4 additions & 4 deletions cpp/src/io/parquet/page_data.cu
Original file line number Diff line number Diff line change
Expand Up @@ -1557,10 +1557,10 @@ extern "C" __global__ void __launch_bounds__(block_size)

bool has_repetition = s->col.max_level[level_type::REPETITION] > 0;

// optimization : it might be useful to have a version of gpuDecodeStream that could go
// wider than 1 warp. Currently it only only uses 1 warp so that it can overlap work
// with the value decoding step when in the actual value decoding kernel. however during
// this preprocess step we have no such limits - we could go as wide as block_size
// optimization : it might be useful to have a version of gpuDecodeStream that could go wider than
// 1 warp. Currently it only uses 1 warp so that it can overlap work with the value decoding step
// when in the actual value decoding kernel. However, during this preprocess step we have no such
// limits - we could go as wide as block_size
if (t < 32) {
constexpr int batch_size = 32;
int target_input_count = batch_size;
Expand Down
6 changes: 6 additions & 0 deletions cpp/src/io/parquet/parquet.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,12 @@ struct SchemaElement {
// };
// }
bool is_stub() const { return repetition_type == REPEATED && num_children == 1; }

// https://github.com/apache/parquet-cpp/blob/642da05/src/parquet/schema.h#L49-L50
// One-level LIST encoding: Only allows required lists with required cells:
// repeated value_type name
bool is_one_level_list() const { return repetition_type == REPEATED and num_children == 0; }

// in parquet terms, a group is a level of nesting in the schema. a group
// can be a struct or a list
bool is_struct() const
Expand Down
45 changes: 39 additions & 6 deletions cpp/src/io/parquet/reader_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,16 @@ type_id to_type_id(SchemaElement const& schema,
return type_id::EMPTY;
}

/**
* @brief Converts cuDF type enum to column logical type
*/
data_type to_data_type(type_id t_id, SchemaElement const& schema)
{
return t_id == type_id::DECIMAL32 || t_id == type_id::DECIMAL64 || t_id == type_id::DECIMAL128
? data_type{t_id, numeric::scale_type{-schema.decimal_scale}}
: data_type{t_id};
}

/**
* @brief Function that returns the required the number of bits to store a value
*/
Expand Down Expand Up @@ -414,6 +424,9 @@ class aggregate_metadata {
// walk upwards, skipping repeated fields
while (schema_index > 0) {
if (!pfm.schema[schema_index].is_stub()) { depth++; }
// schema of one-level encoding list doesn't contain nesting information, so we need to
// manually add an extra nesting level
PointKernel marked this conversation as resolved.
Show resolved Hide resolved
if (pfm.schema[schema_index].is_one_level_list()) { depth++; }
schema_index = pfm.schema[schema_index].parent_idx;
}
return depth;
Expand Down Expand Up @@ -596,11 +609,11 @@ class aggregate_metadata {
}

// if we're at the root, this is a new output column
auto const col_type = to_type_id(schema_elem, strings_to_categorical, timestamp_type_id);
auto const dtype = col_type == type_id::DECIMAL32 || col_type == type_id::DECIMAL64 ||
col_type == type_id::DECIMAL128
? data_type{col_type, numeric::scale_type{-schema_elem.decimal_scale}}
: data_type{col_type};
auto const col_type =
schema_elem.is_one_level_list()
? type_id::LIST
: to_type_id(schema_elem, strings_to_categorical, timestamp_type_id);
auto const dtype = to_data_type(col_type, schema_elem);

column_buffer output_col(dtype, schema_elem.repetition_type == OPTIONAL);
// store the index of this element if inserted in out_col_array
Expand Down Expand Up @@ -630,6 +643,23 @@ class aggregate_metadata {
if (schema_elem.num_children == 0) {
input_column_info& input_col =
input_columns.emplace_back(input_column_info{schema_idx, schema_elem.name});

// set up child output column for one-level encoding list
if (schema_elem.is_one_level_list()) {
// determine the element data type
auto const element_type =
to_type_id(schema_elem, strings_to_categorical, timestamp_type_id);
auto const element_dtype = to_data_type(element_type, schema_elem);

column_buffer element_col(element_dtype, schema_elem.repetition_type == OPTIONAL);
// store the index of this element
nesting.push_back(static_cast<int>(output_col.children.size()));
// TODO: not sure if we should assign a name or leave it blank
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the included in the output schema?
In ORC the names of nested columns are generated as the index in the parent's list of children. Gives a uniform way to access nested columns of lists/maps/structs. I don't know enough about Parquet to understand if the same logic can apply here.

Copy link
Member Author

@PointKernel PointKernel Dec 9, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is really little spec information I can find for this one-level list encoding, not to mention how to name the elements. I will check parquet-cpp to see how they handle it and remove TODO whenever I find a proper fix (which may take some certain time). Probably in a follow-up PR.

element_col.name = "element";

output_col.children.push_back(std::move(element_col));
}

std::copy(nesting.cbegin(), nesting.cend(), std::back_inserter(input_col.nesting));
path_is_valid = true; // If we're able to reach leaf then path is valid
}
Expand Down Expand Up @@ -850,6 +880,10 @@ void generate_depth_remappings(std::map<int, std::pair<std::vector<int>, std::ve
// if this is a repeated field, map it one level deeper
shallowest = cur_schema.is_stub() ? cur_depth + 1 : cur_depth;
}
// if it's one-level encoding list
else if (cur_schema.is_one_level_list()) {
shallowest = cur_depth - 1;
}
if (!cur_schema.is_stub()) { cur_depth--; }
schema_idx = cur_schema.parent_idx;
}
Expand Down Expand Up @@ -1770,7 +1804,6 @@ table_with_metadata reader::impl::read(size_type skip_rows,
// Forward to implementation
reader::reader(std::vector<std::unique_ptr<cudf::io::datasource>>&& sources,
parquet_reader_options const& options,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
: _impl(std::make_unique<impl>(std::move(sources), options, mr))
{
Expand Down
Binary file not shown.
9 changes: 9 additions & 0 deletions python/cudf/cudf/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -2269,6 +2269,15 @@ def test_parquet_reader_brotli(datadir):
assert_eq(expect, got)


def test_parquet_reader_one_level_list(datadir):
fname = datadir / "one_level_list.parquet"

expect = pd.read_parquet(fname)
got = cudf.read_parquet(fname).to_pandas(nullable=True)

assert_eq(expect, got)


@pytest.mark.parametrize("size_bytes", [4_000_000, 1_000_000, 600_000])
@pytest.mark.parametrize("size_rows", [1_000_000, 100_000, 10_000])
def test_parquet_writer_row_group_size(
Expand Down