Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix](parquet) parquet reader confuses logical/physical/slot id of columns #23198

Merged
merged 1 commit into from
Aug 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 0 additions & 8 deletions be/src/vec/exec/format/parquet/parquet_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,6 @@ struct RowRange {
}
};

struct ParquetReadColumn {
ParquetReadColumn(int parquet_col_id, const std::string& file_slot_name)
: _parquet_col_id(parquet_col_id), _file_slot_name(file_slot_name) {};

int _parquet_col_id;
const std::string& _file_slot_name;
};

#pragma pack(1)
struct ParquetInt96 {
uint64_t lo; // time of nanoseconds in a day
Expand Down
2 changes: 0 additions & 2 deletions be/src/vec/exec/format/parquet/vparquet_column_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,6 @@ class ParquetColumnReader {
const std::vector<RowRange>& row_ranges, cctz::time_zone* ctz,
io::IOContext* io_ctx, std::unique_ptr<ParquetColumnReader>& reader,
size_t max_buf_size);
void add_offset_index(tparquet::OffsetIndex* offset_index) { _offset_index = offset_index; }
void set_nested_column() { _nested_column = true; }
virtual const std::vector<level_t>& get_rep_level() const = 0;
virtual const std::vector<level_t>& get_def_level() const = 0;
Expand All @@ -149,7 +148,6 @@ class ParquetColumnReader {
const std::vector<RowRange>& _row_ranges;
cctz::time_zone* _ctz;
io::IOContext* _io_ctx;
tparquet::OffsetIndex* _offset_index;
int64_t _current_row_index = 0;
int _row_range_index = 0;
int64_t _decode_null_map_time = 0;
Expand Down
11 changes: 3 additions & 8 deletions be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ namespace doris::vectorized {
const std::vector<int64_t> RowGroupReader::NO_DELETE = {};

RowGroupReader::RowGroupReader(io::FileReaderSPtr file_reader,
const std::vector<ParquetReadColumn>& read_columns,
const std::vector<std::string>& read_columns,
const int32_t row_group_id, const tparquet::RowGroup& row_group,
cctz::time_zone* ctz, io::IOContext* io_ctx,
const PositionDeleteContext& position_delete_ctx,
Expand Down Expand Up @@ -126,21 +126,16 @@ Status RowGroupReader::init(
const size_t MAX_COLUMN_BUF_SIZE = config::parquet_column_max_buffer_mb << 20;
size_t max_buf_size = std::min(MAX_COLUMN_BUF_SIZE, MAX_GROUP_BUF_SIZE / _read_columns.size());
for (auto& read_col : _read_columns) {
auto field = const_cast<FieldSchema*>(schema.get_column(read_col._file_slot_name));
auto field = const_cast<FieldSchema*>(schema.get_column(read_col));
std::unique_ptr<ParquetColumnReader> reader;
RETURN_IF_ERROR(ParquetColumnReader::create(_file_reader, field, _row_group_meta,
_read_ranges, _ctz, _io_ctx, reader,
max_buf_size));
auto col_iter = col_offsets.find(read_col._parquet_col_id);
if (col_iter != col_offsets.end()) {
tparquet::OffsetIndex oi = col_iter->second;
reader->add_offset_index(&oi);
}
if (reader == nullptr) {
VLOG_DEBUG << "Init row group(" << _row_group_id << ") reader failed";
return Status::Corruption("Init row group reader failed");
}
_column_readers[read_col._file_slot_name] = std::move(reader);
_column_readers[read_col] = std::move(reader);
}
// Check if single slot can be filtered by dict.
if (!_slot_id_to_filter_conjuncts) {
Expand Down
8 changes: 4 additions & 4 deletions be/src/vec/exec/format/parquet/vparquet_group_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,9 @@ class RowGroupReader {
PositionDeleteContext(const PositionDeleteContext& filter) = default;
};

RowGroupReader(io::FileReaderSPtr file_reader,
const std::vector<ParquetReadColumn>& read_columns, const int32_t row_group_id,
const tparquet::RowGroup& row_group, cctz::time_zone* ctz, io::IOContext* io_ctx,
RowGroupReader(io::FileReaderSPtr file_reader, const std::vector<std::string>& read_columns,
const int32_t row_group_id, const tparquet::RowGroup& row_group,
cctz::time_zone* ctz, io::IOContext* io_ctx,
const PositionDeleteContext& position_delete_ctx,
const LazyReadContext& lazy_read_ctx, RuntimeState* state);

Expand Down Expand Up @@ -191,7 +191,7 @@ class RowGroupReader {

io::FileReaderSPtr _file_reader;
std::unordered_map<std::string, std::unique_ptr<ParquetColumnReader>> _column_readers;
const std::vector<ParquetReadColumn>& _read_columns;
const std::vector<std::string>& _read_columns;
const int32_t _row_group_id;
const tparquet::RowGroup& _row_group_meta;
int64_t _remaining_rows;
Expand Down
97 changes: 49 additions & 48 deletions be/src/vec/exec/format/parquet/vparquet_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -322,22 +322,28 @@ Status ParquetReader::init_reader(
// e.g. table added a column after this parquet file was written.
_column_names = &all_column_names;
auto schema_desc = _file_metadata->schema();
std::set<std::string> required_columns(all_column_names.begin(), all_column_names.end());
// Currently only used in iceberg, the columns are dropped but added back
std::set<std::string> dropped_columns(missing_column_names.begin(), missing_column_names.end());
// Make the order of read columns the same as physical order in parquet file
for (int i = 0; i < schema_desc.size(); ++i) {
auto name = schema_desc.get_column(i)->name;
// If the column in parquet file is included in all_column_names and not in missing_column_names,
// add it to _map_column, which means the reader should read the data of this column.
AshinGau marked this conversation as resolved.
Show resolved Hide resolved
// Here to check against missing_column_names is for the 'Add a column back to the table
// with the same column name' case. (drop column a then add column a).
// Shouldn't read this column data in this case.
if (find(all_column_names.begin(), all_column_names.end(), name) !=
all_column_names.end() &&
find(missing_column_names.begin(), missing_column_names.end(), name) ==
missing_column_names.end()) {
_map_column.emplace(name, i);
if (required_columns.find(name) != required_columns.end() &&
dropped_columns.find(name) == dropped_columns.end()) {
required_columns.erase(name);
_read_columns.emplace_back(name);
}
}
for (const std::string& name : required_columns) {
_missing_cols.emplace_back(name);
}

_colname_to_value_range = colname_to_value_range;
RETURN_IF_ERROR(_init_read_columns());
// build column predicates for column lazy read
_lazy_read_ctx.conjuncts = conjuncts;
RETURN_IF_ERROR(_init_row_groups(filter_groups));
Expand Down Expand Up @@ -394,15 +400,15 @@ Status ParquetReader::set_fill_columns(

const FieldDescriptor& schema = _file_metadata->schema();
for (auto& read_col : _read_columns) {
_lazy_read_ctx.all_read_columns.emplace_back(read_col._file_slot_name);
PrimitiveType column_type = schema.get_column(read_col._file_slot_name)->type.type;
_lazy_read_ctx.all_read_columns.emplace_back(read_col);
PrimitiveType column_type = schema.get_column(read_col)->type.type;
if (column_type == TYPE_ARRAY || column_type == TYPE_MAP || column_type == TYPE_STRUCT) {
_has_complex_type = true;
}
if (predicate_columns.size() > 0) {
auto iter = predicate_columns.find(read_col._file_slot_name);
auto iter = predicate_columns.find(read_col);
if (iter == predicate_columns.end()) {
_lazy_read_ctx.lazy_read_columns.emplace_back(read_col._file_slot_name);
_lazy_read_ctx.lazy_read_columns.emplace_back(read_col);
} else {
_lazy_read_ctx.predicate_columns.first.emplace_back(iter->first);
_lazy_read_ctx.predicate_columns.second.emplace_back(iter->second.second);
Expand Down Expand Up @@ -450,29 +456,6 @@ Status ParquetReader::set_fill_columns(
return Status::OK();
}

Status ParquetReader::_init_read_columns() {
std::vector<int> include_column_ids;
for (auto& file_col_name : *_column_names) {
auto iter = _map_column.find(file_col_name);
if (iter != _map_column.end()) {
include_column_ids.emplace_back(iter->second);
} else {
_missing_cols.push_back(file_col_name);
}
}
// It is legal to get empty include_column_ids in query task.
if (include_column_ids.empty()) {
return Status::OK();
}
// The same order as physical columns
std::sort(include_column_ids.begin(), include_column_ids.end());
for (int& parquet_col_id : include_column_ids) {
_read_columns.emplace_back(parquet_col_id,
_file_metadata->schema().get_column(parquet_col_id)->name);
}
return Status::OK();
}

std::unordered_map<std::string, TypeDescriptor> ParquetReader::get_name_to_type() {
std::unordered_map<std::string, TypeDescriptor> map;
const auto& schema_desc = _file_metadata->schema();
Expand Down Expand Up @@ -643,11 +626,24 @@ Status ParquetReader::_init_row_groups(const bool& is_filter_groups) {
RETURN_IF_ERROR(_process_row_group_filter(row_group, &filter_group));
}
int64_t group_size = 0; // only calculate the needed columns
for (auto& read_col : _read_columns) {
auto& parquet_col_id = read_col._parquet_col_id;
if (row_group.columns[parquet_col_id].__isset.meta_data) {
group_size += row_group.columns[parquet_col_id].meta_data.total_compressed_size;
std::function<int64_t(const FieldSchema*)> column_compressed_size =
[&row_group, &column_compressed_size](const FieldSchema* field) -> int64_t {
if (field->physical_column_index >= 0) {
int parquet_col_id = field->physical_column_index;
if (row_group.columns[parquet_col_id].__isset.meta_data) {
return row_group.columns[parquet_col_id].meta_data.total_compressed_size;
}
return 0;
}
int64_t size = 0;
for (const FieldSchema& child : field->children) {
size += column_compressed_size(&child);
}
return size;
};
for (auto& read_col : _read_columns) {
const FieldSchema* field = _file_metadata->schema().get_column(read_col);
group_size += column_compressed_size(field);
}
if (!filter_group) {
_read_row_groups.emplace_back(row_group_idx, row_index, row_index + row_group.num_rows);
Expand Down Expand Up @@ -703,7 +699,7 @@ std::vector<io::PrefetchRange> ParquetReader::_generate_random_access_ranges(
};
const tparquet::RowGroup& row_group = _t_metadata->row_groups[group.row_group_id];
for (const auto& read_col : _read_columns) {
const FieldSchema* field = _file_metadata->schema().get_column(read_col._file_slot_name);
const FieldSchema* field = _file_metadata->schema().get_column(read_col);
scalar_range(field, row_group);
}
if (!result.empty()) {
Expand Down Expand Up @@ -766,11 +762,16 @@ Status ParquetReader::_process_page_index(const tparquet::RowGroup& row_group,
// read twice: parse column index & parse offset index
_column_statistics.meta_read_calls += 2;
for (auto& read_col : _read_columns) {
auto conjunct_iter = _colname_to_value_range->find(read_col._file_slot_name);
auto conjunct_iter = _colname_to_value_range->find(read_col);
if (_colname_to_value_range->end() == conjunct_iter) {
continue;
}
auto& chunk = row_group.columns[read_col._parquet_col_id];
int parquet_col_id = _file_metadata->schema().get_column(read_col)->physical_column_index;
if (parquet_col_id < 0) {
// complex type, not support page index yet.
continue;
}
auto& chunk = row_group.columns[parquet_col_id];
if (chunk.column_index_offset == 0 && chunk.column_index_length == 0) {
continue;
}
Expand All @@ -782,7 +783,7 @@ Status ParquetReader::_process_page_index(const tparquet::RowGroup& row_group,
}
auto& conjuncts = conjunct_iter->second;
std::vector<int> skipped_page_range;
const FieldSchema* col_schema = schema_desc.get_column(read_col._file_slot_name);
const FieldSchema* col_schema = schema_desc.get_column(read_col);
page_index.collect_skipped_page_range(&column_index, conjuncts, col_schema,
skipped_page_range, *_ctz);
if (skipped_page_range.empty()) {
Expand All @@ -797,7 +798,7 @@ Status ParquetReader::_process_page_index(const tparquet::RowGroup& row_group,
// use the union row range
skipped_row_ranges.emplace_back(skipped_row_range);
}
_col_offsets.emplace(read_col._parquet_col_id, offset_index);
_col_offsets.emplace(parquet_col_id, offset_index);
}
if (skipped_row_ranges.empty()) {
read_whole_row_group();
Expand Down Expand Up @@ -849,16 +850,16 @@ Status ParquetReader::_process_column_stat_filter(const std::vector<tparquet::Co
return Status::OK();
}
auto& schema_desc = _file_metadata->schema();
for (auto& col_name : *_column_names) {
auto col_iter = _map_column.find(col_name);
if (col_iter == _map_column.end()) {
continue;
}
for (auto& col_name : _read_columns) {
auto slot_iter = _colname_to_value_range->find(col_name);
if (slot_iter == _colname_to_value_range->end()) {
continue;
}
int parquet_col_id = col_iter->second;
int parquet_col_id = _file_metadata->schema().get_column(col_name)->physical_column_index;
if (parquet_col_id < 0) {
// complex type, not support filter yet.
continue;
}
auto& meta_data = columns[parquet_col_id].meta_data;
auto& statistic = meta_data.statistics;
bool is_all_null =
Expand Down
6 changes: 2 additions & 4 deletions be/src/vec/exec/format/parquet/vparquet_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,6 @@ class ParquetReader : public GenericReader {
RowGroupReader::PositionDeleteContext _get_position_delete_ctx(
const tparquet::RowGroup& row_group,
const RowGroupReader::RowGroupIndex& row_group_index);
Status _init_read_columns();
Status _init_row_groups(const bool& is_filter_groups);
void _init_system_properties();
void _init_file_description();
Expand Down Expand Up @@ -226,12 +225,11 @@ class ParquetReader : public GenericReader {
std::unique_ptr<RowGroupReader> _current_group_reader = nullptr;
// read to the end of current reader
bool _row_group_eof = true;
int32_t _total_groups; // num of groups(stripes) of a parquet(orc) file
std::map<std::string, int> _map_column; // column-name <---> column-index
int32_t _total_groups; // num of groups(stripes) of a parquet(orc) file
// table column name to file column name map. For iceberg schema evolution.
std::unordered_map<std::string, std::string> _table_col_to_file_col;
std::unordered_map<std::string, ColumnValueRangeType>* _colname_to_value_range;
std::vector<ParquetReadColumn> _read_columns;
std::vector<std::string> _read_columns;
RowRange _whole_range = RowRange(0, 0);
const std::vector<int64_t>* _delete_rows = nullptr;
int64_t _delete_rows_index = 0;
Expand Down
4 changes: 2 additions & 2 deletions be/test/vec/exec/parquet/parquet_thrift_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -499,11 +499,11 @@ TEST_F(ParquetThriftReaderTest, group_reader) {
SlotDescriptor string_slot(tslot_desc);
tuple_slots.emplace_back(&string_slot);

std::vector<ParquetReadColumn> read_columns;
std::vector<std::string> read_columns;
RowGroupReader::LazyReadContext lazy_read_ctx;
for (const auto& slot : tuple_slots) {
lazy_read_ctx.all_read_columns.emplace_back(slot->col_name());
read_columns.emplace_back(ParquetReadColumn(7, slot->col_name()));
read_columns.emplace_back(slot->col_name());
}
io::FileSystemSPtr local_fs = io::LocalFileSystem::create("");
io::FileReaderSPtr file_reader;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,6 @@
10 {101:1, 102:1, 103:1} {102:10, 104:1, 105:2} {"field1":100, "field0":100} {"field2":3000000} {"field3":300000000} {"field4":3.14, "hello world":0.111, "hell0":7.001} {"field5":3.14159} {103:"Hello"} {"field6":2023-07-28 12:34:56.000000, "field000006":2023-07-08 12:34:57.000000, "field2432456":2023-07-28 12:34:50.000000} {"field7":2023-07-28} {1, 1, 20, 3000000, 44444444444, 3.14, 3.14159, "Hello", 2023-07-28 12:34:56.000000, 2023-07-28}
11 {101:1, 102:1, 13:1, 12:1} {102:10, 14:1, 15:2, 12:10} {"field1":100, "fie88ld0":100, "fieweld0":100, "fieeeld1":100, "fieeeld0":100, "feeield0":100, "feeield1":100, "firreld0":100, "field0":100} {"field2":3000000, "abcd":4000000, "1231":3000000} {"fi7eld3":300000000, "field30":300000000, "fielwwd3":300000000, "fi055":300000000, "field7":300000121323} {"field4":3.14, "hello world":0.111, "hell0":7.001} {"field5":3.14159} {103:"Hello", 0:"hello"} {"field6":2023-07-28 12:34:56.000000, "field000006":2023-07-08 12:34:57.000000, "field2432456":2023-07-28 12:34:50.000000} {"field7":2023-07-28} {1, 1, 20, 3000000, 44444444444, 3.14, 3.14159, "Hello", 2023-07-28 12:34:56.000000, 2023-07-28}

-- !filter_complex --
50000 50000 50000

Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,11 @@ suite("test_hive_text_complex_type", "p2,external,hive,external_remote,external_

qt_sql2 """ select * from hive_text_complex_type_delimiter order by column1; """


qt_filter_complex """select count(column_primitive_integer),
count(column1_struct),
count(column_primitive_bigint)
from parquet_predicate_table where column_primitive_bigint = 6;"""
sql """drop catalog ${catalog_name};"""
}
}

Loading