Skip to content

Commit

Permalink
[new-feature](complex-type) support read nested parquet and orc file …
Browse files Browse the repository at this point in the history
…with complex type (apache#22793)
  • Loading branch information
amorynan committed Aug 20, 2023
1 parent b3b2ffb commit 0c1b2bd
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 25 deletions.
34 changes: 9 additions & 25 deletions be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,25 +123,13 @@ Status ParquetColumnReader::create(io::FileReaderSPtr file, FieldSchema* field,
size_t max_buf_size) {
if (field->type.type == TYPE_ARRAY) {
std::unique_ptr<ParquetColumnReader> element_reader;
if (field->children[0].type.type == TYPE_MAP ||
field->children[0].type.type == TYPE_STRUCT) {
return Status::InternalError(
"Array does not support nested map/struct type in column {}", field->name);
}
RETURN_IF_ERROR(create(file, &field->children[0], row_group, row_ranges, ctz, io_ctx,
element_reader, max_buf_size));
element_reader->set_nested_column();
ArrayColumnReader* array_reader = new ArrayColumnReader(row_ranges, ctz, io_ctx);
auto array_reader = ArrayColumnReader::create_unique(row_ranges, ctz, io_ctx);
RETURN_IF_ERROR(array_reader->init(std::move(element_reader), field));
reader.reset(array_reader);
reader.reset(array_reader.release());
} else if (field->type.type == TYPE_MAP) {
auto key_type = field->children[0].children[0].type.type;
auto value_type = field->children[0].children[1].type.type;
if (key_type == TYPE_ARRAY || key_type == TYPE_MAP || key_type == TYPE_STRUCT ||
value_type == TYPE_ARRAY || value_type == TYPE_MAP || value_type == TYPE_STRUCT) {
return Status::InternalError("Map does not support nested complex type in column {}",
field->name);
}
std::unique_ptr<ParquetColumnReader> key_reader;
std::unique_ptr<ParquetColumnReader> value_reader;
RETURN_IF_ERROR(create(file, &field->children[0].children[0], row_group, row_ranges, ctz,
Expand All @@ -150,31 +138,27 @@ Status ParquetColumnReader::create(io::FileReaderSPtr file, FieldSchema* field,
io_ctx, value_reader, max_buf_size));
key_reader->set_nested_column();
value_reader->set_nested_column();
MapColumnReader* map_reader = new MapColumnReader(row_ranges, ctz, io_ctx);
auto map_reader = MapColumnReader::create_unique(row_ranges, ctz, io_ctx);
RETURN_IF_ERROR(map_reader->init(std::move(key_reader), std::move(value_reader), field));
reader.reset(map_reader);
reader.reset(map_reader.release());
} else if (field->type.type == TYPE_STRUCT) {
std::vector<std::unique_ptr<ParquetColumnReader>> child_readers;
child_readers.reserve(field->children.size());
for (int i = 0; i < field->children.size(); ++i) {
auto child_type = field->children[i].type.type;
if (child_type == TYPE_ARRAY || child_type == TYPE_MAP || child_type == TYPE_STRUCT) {
return Status::InternalError(
"Struct does not support nested complex type in column {}", field->name);
}
std::unique_ptr<ParquetColumnReader> child_reader;
RETURN_IF_ERROR(create(file, &field->children[i], row_group, row_ranges, ctz, io_ctx,
child_reader, max_buf_size));
child_reader->set_nested_column();
child_readers.emplace_back(std::move(child_reader));
}
StructColumnReader* struct_reader = new StructColumnReader(row_ranges, ctz, io_ctx);
auto struct_reader = StructColumnReader::create_unique(row_ranges, ctz, io_ctx);
RETURN_IF_ERROR(struct_reader->init(std::move(child_readers), field));
reader.reset(struct_reader);
reader.reset(struct_reader.release());
} else {
const tparquet::ColumnChunk& chunk = row_group.columns[field->physical_column_index];
ScalarColumnReader* scalar_reader = new ScalarColumnReader(row_ranges, chunk, ctz, io_ctx);
auto scalar_reader = ScalarColumnReader::create_unique(row_ranges, chunk, ctz, io_ctx);
RETURN_IF_ERROR(scalar_reader->init(file, field, max_buf_size));
reader.reset(scalar_reader);
reader.reset(scalar_reader.release());
}
return Status::OK();
}
Expand Down
4 changes: 4 additions & 0 deletions be/src/vec/exec/format/parquet/vparquet_column_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ class ParquetColumnReader {
};

class ScalarColumnReader : public ParquetColumnReader {
ENABLE_FACTORY_CREATOR(ScalarColumnReader)
public:
ScalarColumnReader(const std::vector<RowRange>& row_ranges,
const tparquet::ColumnChunk& chunk_meta, cctz::time_zone* ctz,
Expand Down Expand Up @@ -195,6 +196,7 @@ class ScalarColumnReader : public ParquetColumnReader {
};

class ArrayColumnReader : public ParquetColumnReader {
ENABLE_FACTORY_CREATOR(ArrayColumnReader)
public:
ArrayColumnReader(const std::vector<RowRange>& row_ranges, cctz::time_zone* ctz,
io::IOContext* io_ctx)
Expand All @@ -218,6 +220,7 @@ class ArrayColumnReader : public ParquetColumnReader {
};

class MapColumnReader : public ParquetColumnReader {
ENABLE_FACTORY_CREATOR(MapColumnReader)
public:
MapColumnReader(const std::vector<RowRange>& row_ranges, cctz::time_zone* ctz,
io::IOContext* io_ctx)
Expand Down Expand Up @@ -252,6 +255,7 @@ class MapColumnReader : public ParquetColumnReader {
};

class StructColumnReader : public ParquetColumnReader {
ENABLE_FACTORY_CREATOR(StructColumnReader)
public:
StructColumnReader(const std::vector<RowRange>& row_ranges, cctz::time_zone* ctz,
io::IOContext* io_ctx)
Expand Down

0 comments on commit 0c1b2bd

Please sign in to comment.