Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[new-feature](complex-type) support read nested parquet and orc file with complex type #22793

Merged
merged 3 commits into from
Aug 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 9 additions & 25 deletions be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,25 +123,13 @@ Status ParquetColumnReader::create(io::FileReaderSPtr file, FieldSchema* field,
size_t max_buf_size) {
if (field->type.type == TYPE_ARRAY) {
std::unique_ptr<ParquetColumnReader> element_reader;
if (field->children[0].type.type == TYPE_MAP ||
field->children[0].type.type == TYPE_STRUCT) {
return Status::InternalError(
"Array does not support nested map/struct type in column {}", field->name);
}
RETURN_IF_ERROR(create(file, &field->children[0], row_group, row_ranges, ctz, io_ctx,
element_reader, max_buf_size));
element_reader->set_nested_column();
ArrayColumnReader* array_reader = new ArrayColumnReader(row_ranges, ctz, io_ctx);
auto array_reader = ArrayColumnReader::create_unique(row_ranges, ctz, io_ctx);
RETURN_IF_ERROR(array_reader->init(std::move(element_reader), field));
reader.reset(array_reader);
reader.reset(array_reader.release());
} else if (field->type.type == TYPE_MAP) {
auto key_type = field->children[0].children[0].type.type;
auto value_type = field->children[0].children[1].type.type;
if (key_type == TYPE_ARRAY || key_type == TYPE_MAP || key_type == TYPE_STRUCT ||
value_type == TYPE_ARRAY || value_type == TYPE_MAP || value_type == TYPE_STRUCT) {
return Status::InternalError("Map does not support nested complex type in column {}",
field->name);
}
std::unique_ptr<ParquetColumnReader> key_reader;
std::unique_ptr<ParquetColumnReader> value_reader;
RETURN_IF_ERROR(create(file, &field->children[0].children[0], row_group, row_ranges, ctz,
Expand All @@ -150,31 +138,27 @@ Status ParquetColumnReader::create(io::FileReaderSPtr file, FieldSchema* field,
io_ctx, value_reader, max_buf_size));
key_reader->set_nested_column();
value_reader->set_nested_column();
MapColumnReader* map_reader = new MapColumnReader(row_ranges, ctz, io_ctx);
auto map_reader = MapColumnReader::create_unique(row_ranges, ctz, io_ctx);
RETURN_IF_ERROR(map_reader->init(std::move(key_reader), std::move(value_reader), field));
reader.reset(map_reader);
reader.reset(map_reader.release());
} else if (field->type.type == TYPE_STRUCT) {
std::vector<std::unique_ptr<ParquetColumnReader>> child_readers;
child_readers.reserve(field->children.size());
for (int i = 0; i < field->children.size(); ++i) {
auto child_type = field->children[i].type.type;
if (child_type == TYPE_ARRAY || child_type == TYPE_MAP || child_type == TYPE_STRUCT) {
return Status::InternalError(
"Struct does not support nested complex type in column {}", field->name);
}
std::unique_ptr<ParquetColumnReader> child_reader;
RETURN_IF_ERROR(create(file, &field->children[i], row_group, row_ranges, ctz, io_ctx,
child_reader, max_buf_size));
child_reader->set_nested_column();
child_readers.emplace_back(std::move(child_reader));
amorynan marked this conversation as resolved.
Show resolved Hide resolved
}
StructColumnReader* struct_reader = new StructColumnReader(row_ranges, ctz, io_ctx);
auto struct_reader = StructColumnReader::create_unique(row_ranges, ctz, io_ctx);
RETURN_IF_ERROR(struct_reader->init(std::move(child_readers), field));
reader.reset(struct_reader);
reader.reset(struct_reader.release());
} else {
const tparquet::ColumnChunk& chunk = row_group.columns[field->physical_column_index];
ScalarColumnReader* scalar_reader = new ScalarColumnReader(row_ranges, chunk, ctz, io_ctx);
auto scalar_reader = ScalarColumnReader::create_unique(row_ranges, chunk, ctz, io_ctx);
RETURN_IF_ERROR(scalar_reader->init(file, field, max_buf_size));
reader.reset(scalar_reader);
reader.reset(scalar_reader.release());
}
return Status::OK();
}
Expand Down
4 changes: 4 additions & 0 deletions be/src/vec/exec/format/parquet/vparquet_column_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ class ParquetColumnReader {
};

class ScalarColumnReader : public ParquetColumnReader {
ENABLE_FACTORY_CREATOR(ScalarColumnReader)
public:
ScalarColumnReader(const std::vector<RowRange>& row_ranges,
const tparquet::ColumnChunk& chunk_meta, cctz::time_zone* ctz,
Expand Down Expand Up @@ -195,6 +196,7 @@ class ScalarColumnReader : public ParquetColumnReader {
};

class ArrayColumnReader : public ParquetColumnReader {
ENABLE_FACTORY_CREATOR(ArrayColumnReader)
public:
ArrayColumnReader(const std::vector<RowRange>& row_ranges, cctz::time_zone* ctz,
io::IOContext* io_ctx)
Expand All @@ -218,6 +220,7 @@ class ArrayColumnReader : public ParquetColumnReader {
};

class MapColumnReader : public ParquetColumnReader {
ENABLE_FACTORY_CREATOR(MapColumnReader)
public:
MapColumnReader(const std::vector<RowRange>& row_ranges, cctz::time_zone* ctz,
io::IOContext* io_ctx)
Expand Down Expand Up @@ -252,6 +255,7 @@ class MapColumnReader : public ParquetColumnReader {
};

class StructColumnReader : public ParquetColumnReader {
ENABLE_FACTORY_CREATOR(StructColumnReader)
public:
StructColumnReader(const std::vector<RowRange>& row_ranges, cctz::time_zone* ctz,
io::IOContext* io_ctx)
Expand Down