Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feature](hive)Support reading renamed Parquet Hive and Orc Hive tables. (#38432) #38809

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 10 additions & 6 deletions be/src/vec/exec/format/orc/vorc_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -279,13 +279,15 @@ Status OrcReader::init_reader(
const VExprContextSPtrs& conjuncts, bool is_acid, const TupleDescriptor* tuple_descriptor,
const RowDescriptor* row_descriptor,
const VExprContextSPtrs* not_single_slot_filter_conjuncts,
const std::unordered_map<int, VExprContextSPtrs>* slot_id_to_filter_conjuncts) {
const std::unordered_map<int, VExprContextSPtrs>* slot_id_to_filter_conjuncts,
const bool hive_use_column_names) {
_column_names = column_names;
_colname_to_value_range = colname_to_value_range;
_lazy_read_ctx.conjuncts = conjuncts;
_is_acid = is_acid;
_tuple_descriptor = tuple_descriptor;
_row_descriptor = row_descriptor;
_is_hive1_orc_or_use_idx = !hive_use_column_names;
if (not_single_slot_filter_conjuncts != nullptr && !not_single_slot_filter_conjuncts->empty()) {
_not_single_slot_filter_conjuncts.insert(_not_single_slot_filter_conjuncts.end(),
not_single_slot_filter_conjuncts->begin(),
Expand Down Expand Up @@ -337,10 +339,11 @@ Status OrcReader::_init_read_columns() {

// In old version slot_name_to_schema_pos may not be set in _scan_params
// TODO, should be removed in 2.2 or later
_is_hive1_orc = is_hive1_orc && _scan_params.__isset.slot_name_to_schema_pos;
_is_hive1_orc_or_use_idx = (is_hive1_orc || _is_hive1_orc_or_use_idx) &&
_scan_params.__isset.slot_name_to_schema_pos;
for (size_t i = 0; i < _column_names->size(); ++i) {
auto& col_name = (*_column_names)[i];
if (_is_hive1_orc) {
if (_is_hive1_orc_or_use_idx) {
auto iter = _scan_params.slot_name_to_schema_pos.find(col_name);
if (iter != _scan_params.slot_name_to_schema_pos.end()) {
int pos = iter->second;
Expand Down Expand Up @@ -375,9 +378,10 @@ Status OrcReader::_init_read_columns() {
_read_cols_lower_case.emplace_back(col_name);
// For hive engine, store the orc column name to schema column name map.
// This is for Hive 1.x orc file with internal column name _col0, _col1...
if (_is_hive1_orc) {
if (_is_hive1_orc_or_use_idx) {
_removed_acid_file_col_name_to_schema_col[orc_cols[pos]] = col_name;
}

_col_name_to_file_col_name[col_name] = read_col;
}
}
Expand Down Expand Up @@ -708,7 +712,7 @@ bool OrcReader::_init_search_argument(
if (iter == colname_to_value_range->end()) {
continue;
}
auto type_it = type_map.find(col_name);
auto type_it = type_map.find(_col_name_to_file_col_name[col_name]);
if (type_it == type_map.end()) {
continue;
}
Expand Down Expand Up @@ -913,7 +917,7 @@ Status OrcReader::_init_select_types(const orc::Type& type, int idx) {
std::string name;
// For hive engine, translate the column name in orc file to schema column name.
// This is for Hive 1.x which use internal column name _col0, _col1...
if (_is_hive1_orc) {
if (_is_hive1_orc_or_use_idx) {
name = _removed_acid_file_col_name_to_schema_col[type.getFieldName(i)];
} else {
name = get_field_name_lower_case(&type, i);
Expand Down
15 changes: 10 additions & 5 deletions be/src/vec/exec/format/orc/vorc_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,14 +139,15 @@ class OrcReader : public GenericReader {
const std::string& ctz, io::IOContext* io_ctx, bool enable_lazy_mat = true);

~OrcReader() override;

//If you want to read the file by index instead of column name, set hive_use_column_names to false.
Status init_reader(
const std::vector<std::string>* column_names,
std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range,
const VExprContextSPtrs& conjuncts, bool is_acid,
const TupleDescriptor* tuple_descriptor, const RowDescriptor* row_descriptor,
const VExprContextSPtrs* not_single_slot_filter_conjuncts,
const std::unordered_map<int, VExprContextSPtrs>* slot_id_to_filter_conjuncts);
const std::unordered_map<int, VExprContextSPtrs>* slot_id_to_filter_conjuncts,
const bool hive_use_column_names = true);

Status set_fill_columns(
const std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>&
Expand Down Expand Up @@ -570,9 +571,11 @@ class OrcReader : public GenericReader {
// This is used for Hive 1.x which use internal column name in Orc file.
// _col0, _col1...
std::unordered_map<std::string, std::string> _removed_acid_file_col_name_to_schema_col;
// Flag for hive engine. True if the external table engine is Hive1.x with orc col name
// as _col1, col2, ...
bool _is_hive1_orc = false;
// Flag for hive engine.
// 1. True if the external table engine is Hive1.x with orc col name as _col1, col2, ...
// 2. If true, use indexes instead of column names when reading orc tables.
bool _is_hive1_orc_or_use_idx = false;

std::unordered_map<std::string, std::string> _col_name_to_file_col_name;
std::unordered_map<std::string, const orc::Type*> _type_map;
std::vector<const orc::Type*> _col_orc_type;
Expand Down Expand Up @@ -621,6 +624,8 @@ class OrcReader : public GenericReader {
// resolve schema change
std::unordered_map<std::string, std::unique_ptr<converter::ColumnTypeConverter>> _converters;
//for iceberg table , when table column name != file column name
//TODO(CXY) : remove _table_col_to_file_col,because we hava _col_name_to_file_col_name,
// the two have the same effect.
std::unordered_map<std::string, std::string> _table_col_to_file_col;
//support iceberg position delete .
std::vector<int64_t>* _position_delete_ordered_rowids = nullptr;
Expand Down
93 changes: 72 additions & 21 deletions be/src/vec/exec/format/parquet/vparquet_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <gen_cpp/parquet_types.h>
#include <glog/logging.h>

#include <algorithm>
#include <functional>
#include <utility>

Expand Down Expand Up @@ -300,12 +301,14 @@ Status ParquetReader::init_reader(
const std::unordered_map<std::string, int>* colname_to_slot_id,
const VExprContextSPtrs* not_single_slot_filter_conjuncts,
const std::unordered_map<int, VExprContextSPtrs>* slot_id_to_filter_conjuncts,
bool filter_groups) {
bool filter_groups, const bool hive_use_column_names) {
_tuple_descriptor = tuple_descriptor;
_row_descriptor = row_descriptor;
_colname_to_slot_id = colname_to_slot_id;
_not_single_slot_filter_conjuncts = not_single_slot_filter_conjuncts;
_slot_id_to_filter_conjuncts = slot_id_to_filter_conjuncts;
_colname_to_value_range = colname_to_value_range;
_hive_use_column_names = hive_use_column_names;
if (_file_metadata == nullptr) {
return Status::InternalError("failed to init parquet reader, please open reader first");
}
Expand All @@ -320,28 +323,59 @@ Status ParquetReader::init_reader(
// e.g. table added a column after this parquet file was written.
_column_names = &all_column_names;
auto schema_desc = _file_metadata->schema();
std::set<std::string> required_columns(all_column_names.begin(), all_column_names.end());
// Currently only used in iceberg, the columns are dropped but added back
std::set<std::string> dropped_columns(missing_column_names.begin(), missing_column_names.end());
// Make the order of read columns the same as physical order in parquet file
for (int i = 0; i < schema_desc.size(); ++i) {
auto name = schema_desc.get_column(i)->name;
// If the column in parquet file is included in all_column_names and not in missing_column_names,
// add it to _map_column, which means the reader should read the data of this column.
// Here to check against missing_column_names is for the 'Add a column back to the table
// with the same column name' case. (drop column a then add column a).
// Shouldn't read this column data in this case.
if (required_columns.find(name) != required_columns.end() &&
dropped_columns.find(name) == dropped_columns.end()) {
required_columns.erase(name);
_read_columns.emplace_back(name);
if (_hive_use_column_names) {
std::set<std::string> required_columns(all_column_names.begin(), all_column_names.end());
// Currently only used in iceberg, the columns are dropped but added back
std::set<std::string> dropped_columns(missing_column_names.begin(),
missing_column_names.end());
// Make the order of read columns the same as physical order in parquet file
for (int i = 0; i < schema_desc.size(); ++i) {
auto name = schema_desc.get_column(i)->name;
// If the column in parquet file is included in all_column_names and not in missing_column_names,
// add it to _map_column, which means the reader should read the data of this column.
// Here to check against missing_column_names is for the 'Add a column back to the table
// with the same column name' case. (drop column a then add column a).
// Shouldn't read this column data in this case.
if (required_columns.find(name) != required_columns.end() &&
dropped_columns.find(name) == dropped_columns.end()) {
required_columns.erase(name);
_read_columns.emplace_back(name);
}
}
for (const std::string& name : required_columns) {
_missing_cols.emplace_back(name);
}
} else {
std::unordered_map<std::string, ColumnValueRangeType> new_colname_to_value_range;
const auto& table_column_idxs = _scan_params.column_idxs;
std::map<int, int> table_col_id_to_idx;
for (int i = 0; i < table_column_idxs.size(); i++) {
table_col_id_to_idx.insert({table_column_idxs[i], i});
}
}
for (const std::string& name : required_columns) {
_missing_cols.emplace_back(name);
}

_colname_to_value_range = colname_to_value_range;
for (auto [id, idx] : table_col_id_to_idx) {
if (id >= schema_desc.size()) {
_missing_cols.emplace_back(all_column_names[idx]);
} else {
auto& table_col = all_column_names[idx];
auto file_col = schema_desc.get_column(id)->name;
_read_columns.emplace_back(file_col);

if (table_col != file_col) {
_table_col_to_file_col[table_col] = file_col;
auto iter = _colname_to_value_range->find(table_col);
if (iter != _colname_to_value_range->end()) {
continue;
}
new_colname_to_value_range[file_col] = iter->second;
_colname_to_value_range->erase(iter->first);
}
}
}
for (auto it : new_colname_to_value_range) {
_colname_to_value_range->emplace(it.first, std::move(it.second));
}
}
// build column predicates for column lazy read
_lazy_read_ctx.conjuncts = conjuncts;
RETURN_IF_ERROR(_init_row_groups(filter_groups));
Expand Down Expand Up @@ -525,6 +559,16 @@ Status ParquetReader::get_next_block(Block* block, size_t* read_rows, bool* eof)
return Status::OK();
}

if (!_hive_use_column_names) {
for (auto i = 0; i < block->get_names().size(); i++) {
auto& col = block->get_by_position(i);
if (_table_col_to_file_col.contains(col.name)) {
col.name = _table_col_to_file_col[col.name];
}
}
block->initialize_index_by_name();
}

SCOPED_RAW_TIMER(&_statistics.column_read_time);
Status batch_st =
_current_group_reader->next_batch(block, _batch_size, read_rows, &_row_group_eof);
Expand All @@ -535,6 +579,13 @@ Status ParquetReader::get_next_block(Block* block, size_t* read_rows, bool* eof)
*eof = true;
return Status::OK();
}

if (!_hive_use_column_names) {
for (auto i = 0; i < block->columns(); i++) {
block->get_by_position(i).name = (*_column_names)[i];
}
block->initialize_index_by_name();
}
if (!batch_st.ok()) {
return Status::InternalError("Read parquet file {} failed, reason = {}", _scan_range.path,
batch_st.to_string());
Expand Down
3 changes: 2 additions & 1 deletion be/src/vec/exec/format/parquet/vparquet_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ class ParquetReader : public GenericReader {
const std::unordered_map<std::string, int>* colname_to_slot_id,
const VExprContextSPtrs* not_single_slot_filter_conjuncts,
const std::unordered_map<int, VExprContextSPtrs>* slot_id_to_filter_conjuncts,
bool filter_groups = true);
bool filter_groups = true, const bool hive_use_column_names = true);

Status get_next_block(Block* block, size_t* read_rows, bool* eof) override;

Expand Down Expand Up @@ -283,5 +283,6 @@ class ParquetReader : public GenericReader {
const std::unordered_map<std::string, int>* _colname_to_slot_id = nullptr;
const VExprContextSPtrs* _not_single_slot_filter_conjuncts = nullptr;
const std::unordered_map<int, VExprContextSPtrs>* _slot_id_to_filter_conjuncts = nullptr;
bool _hive_use_column_names = false;
};
} // namespace doris::vectorized
21 changes: 19 additions & 2 deletions be/src/vec/exec/scan/vfile_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -862,12 +862,21 @@ Status VFileScanner::_get_next_reader() {
RETURN_IF_ERROR(paimon_reader->init_row_filters(range));
_cur_reader = std::move(paimon_reader);
} else {
bool hive_parquet_use_column_names = true;

if (range.__isset.table_format_params &&
range.table_format_params.table_format_type == "hive" && _state != nullptr)
[[likely]] {
hive_parquet_use_column_names =
_state->query_options().hive_parquet_use_column_names;
}

std::vector<std::string> place_holder;
init_status = parquet_reader->init_reader(
_file_col_names, place_holder, _colname_to_value_range,
_push_down_conjuncts, _real_tuple_desc, _default_val_row_desc.get(),
_col_name_to_slot_id, &_not_single_slot_filter_conjuncts,
&_slot_id_to_filter_conjuncts);
&_slot_id_to_filter_conjuncts, true, hive_parquet_use_column_names);
_cur_reader = std::move(parquet_reader);
}
need_to_get_parsed_schema = true;
Expand Down Expand Up @@ -923,10 +932,18 @@ Status VFileScanner::_get_next_reader() {
RETURN_IF_ERROR(paimon_reader->init_row_filters(range));
_cur_reader = std::move(paimon_reader);
} else {
bool hive_orc_use_column_names = true;

if (range.__isset.table_format_params &&
range.table_format_params.table_format_type == "hive" && _state != nullptr)
[[likely]] {
hive_orc_use_column_names = _state->query_options().hive_orc_use_column_names;
}
init_status = orc_reader->init_reader(
&_file_col_names, _colname_to_value_range, _push_down_conjuncts, false,
_real_tuple_desc, _default_val_row_desc.get(),
&_not_single_slot_filter_conjuncts, &_slot_id_to_filter_conjuncts);
&_not_single_slot_filter_conjuncts, &_slot_id_to_filter_conjuncts,
hive_orc_use_column_names);
_cur_reader = std::move(orc_reader);
}
need_to_get_parsed_schema = true;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
use default;

create table simulation_hive1_orc(
`a` boolean,
`b` int,
`c` string
)stored as orc
LOCATION '/user/doris/preinstalled_data/orc_table/simulation_hive1_orc';
msck repair table simulation_hive1_orc;

create table test_hive_rename_column_parquet(
`new_a` boolean,
`new_b` int,
`c` string,
`new_d` int,
`f` string
)stored as parquet
LOCATION '/user/doris/preinstalled_data/parquet_table/test_hive_rename_column_parquet';
msck repair table test_hive_rename_column_parquet;

create table test_hive_rename_column_orc(
`new_a` boolean,
`new_b` int,
`c` string,
`new_d` int,
`f` string
)stored as orc
LOCATION '/user/doris/preinstalled_data/orc_table/test_hive_rename_column_orc';
msck repair table test_hive_rename_column_orc;
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,10 @@ private TScanRangeLocations splitToScanRange(
transactionalHiveDesc.setDeleteDeltas(deleteDeltaDescs);
tableFormatFileDesc.setTransactionalHiveParams(transactionalHiveDesc);
rangeDesc.setTableFormatParams(tableFormatFileDesc);
} else if (fileSplit instanceof HiveSplit) {
TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc();
tableFormatFileDesc.setTableFormatType(TableFormatType.HIVE.value());
rangeDesc.setTableFormatParams(tableFormatFileDesc);
}

setScanParams(rangeDesc, fileSplit);
Expand Down
20 changes: 20 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,10 @@ public class SessionVariable implements Serializable, Writable {

public static final String ENABLE_PUSHDOWN_MINMAX_ON_UNIQUE = "enable_pushdown_minmax_on_unique";

public static final String HIVE_PARQUET_USE_COLUMN_NAMES = "hive_parquet_use_column_names";

public static final String HIVE_ORC_USE_COLUMN_NAMES = "hive_orc_use_column_names";

public static final String KEEP_CARRIAGE_RETURN = "keep_carriage_return";

public static final String ENABLE_PUSHDOWN_STRING_MINMAX = "enable_pushdown_string_minmax";
Expand Down Expand Up @@ -1770,11 +1774,25 @@ public void setEnableLeftZigZag(boolean enableLeftZigZag) {
public int createTablePartitionMaxNum = 10000;


@VariableMgr.VarAttr(name = HIVE_PARQUET_USE_COLUMN_NAMES,
description = {"默认情况下按名称访问 Parquet 列。将此属性设置为“false”可按 Hive 表定义中的序号位置访问列。",
"Access Parquet columns by name by default. Set this property to `false` to access columns "
+ "by their ordinal position in the Hive table definition."})
public boolean hiveParquetUseColumnNames = true;


@VariableMgr.VarAttr(name = HIVE_ORC_USE_COLUMN_NAMES,
description = {"默认情况下按名称访问 Orc 列。将此属性设置为“false”可按 Hive 表定义中的序号位置访问列。",
"Access Parquet columns by name by default. Set this property to `false` to access columns "
+ "by their ordinal position in the Hive table definition."})
public boolean hiveOrcUseColumnNames = true;

@VariableMgr.VarAttr(name = KEEP_CARRIAGE_RETURN,
description = {"在同时处理\r和\r\n作为CSV的行分隔符时,是否保留\r",
"When processing both \\n and \\r\\n as CSV line separators, should \\r be retained?"})
public boolean keepCarriageReturn = false;


@VariableMgr.VarAttr(name = FORCE_JNI_SCANNER,
description = {"强制使用jni方式读取外表", "Force the use of jni mode to read external table"})
private boolean forceJniScanner = false;
Expand Down Expand Up @@ -3435,6 +3453,8 @@ public TQueryOptions toThrift() {

tResult.setReadCsvEmptyLineAsNull(readCsvEmptyLineAsNull);
tResult.setSerdeDialect(getSerdeDialect());
tResult.setHiveOrcUseColumnNames(hiveOrcUseColumnNames);
tResult.setHiveParquetUseColumnNames(hiveParquetUseColumnNames);
tResult.setKeepCarriageReturn(keepCarriageReturn);
return tResult;
}
Expand Down
Loading
Loading