Skip to content

Commit

Permalink
[feature](hive)Support reading renamed Parquet Hive and Orc Hive tabl…
Browse files Browse the repository at this point in the history
…es. (#38432) (#38809)

bp #38432 

## Proposed changes
Add `hive_parquet_use_column_names` and `hive_orc_use_column_names`
session variables to read the table after rename column in `Hive`.

These two session variables are referenced from
`parquet_use_column_names` and `orc_use_column_names` of `Trino` hive
connector.

By default, these two session variables are true. When they are set to
false, reading orc/parquet will access the columns according to the
ordinal position in the Hive table definition.

For example:
```mysql
in Hive :
hive> create table tmp (a int , b string) stored as parquet;
hive> insert into table tmp values(1,"2");
hive> alter table tmp  change column  a new_a int;
hive> insert into table tmp values(2,"4");

in Doris :
mysql> set hive_parquet_use_column_names=true;
Query OK, 0 rows affected (0.00 sec)

mysql> select  * from tmp;
+-------+------+
| new_a | b    |
+-------+------+
|  NULL | 2    |
|     2 | 4    |
+-------+------+
2 rows in set (0.02 sec)

mysql> set hive_parquet_use_column_names=false;
Query OK, 0 rows affected (0.00 sec)

mysql> select  * from tmp;
+-------+------+
| new_a | b    |
+-------+------+
|     1 | 2    |
|     2 | 4    |
+-------+------+
2 rows in set (0.02 sec)
```

You can use `set
parquet.column.index.access/orc.force.positional.evolution = true/false`
in hive 3 to control the results of reading the table like these two
session variables. However, for the rename struct inside column parquet
table, the effects of hive and doris are different.
  • Loading branch information
hubgeter authored Aug 5, 2024
1 parent 53773ae commit 5d02c48
Show file tree
Hide file tree
Showing 20 changed files with 803 additions and 35 deletions.
16 changes: 10 additions & 6 deletions be/src/vec/exec/format/orc/vorc_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -279,13 +279,15 @@ Status OrcReader::init_reader(
const VExprContextSPtrs& conjuncts, bool is_acid, const TupleDescriptor* tuple_descriptor,
const RowDescriptor* row_descriptor,
const VExprContextSPtrs* not_single_slot_filter_conjuncts,
const std::unordered_map<int, VExprContextSPtrs>* slot_id_to_filter_conjuncts) {
const std::unordered_map<int, VExprContextSPtrs>* slot_id_to_filter_conjuncts,
const bool hive_use_column_names) {
_column_names = column_names;
_colname_to_value_range = colname_to_value_range;
_lazy_read_ctx.conjuncts = conjuncts;
_is_acid = is_acid;
_tuple_descriptor = tuple_descriptor;
_row_descriptor = row_descriptor;
_is_hive1_orc_or_use_idx = !hive_use_column_names;
if (not_single_slot_filter_conjuncts != nullptr && !not_single_slot_filter_conjuncts->empty()) {
_not_single_slot_filter_conjuncts.insert(_not_single_slot_filter_conjuncts.end(),
not_single_slot_filter_conjuncts->begin(),
Expand Down Expand Up @@ -337,10 +339,11 @@ Status OrcReader::_init_read_columns() {

// In old version slot_name_to_schema_pos may not be set in _scan_params
// TODO, should be removed in 2.2 or later
_is_hive1_orc = is_hive1_orc && _scan_params.__isset.slot_name_to_schema_pos;
_is_hive1_orc_or_use_idx = (is_hive1_orc || _is_hive1_orc_or_use_idx) &&
_scan_params.__isset.slot_name_to_schema_pos;
for (size_t i = 0; i < _column_names->size(); ++i) {
auto& col_name = (*_column_names)[i];
if (_is_hive1_orc) {
if (_is_hive1_orc_or_use_idx) {
auto iter = _scan_params.slot_name_to_schema_pos.find(col_name);
if (iter != _scan_params.slot_name_to_schema_pos.end()) {
int pos = iter->second;
Expand Down Expand Up @@ -375,9 +378,10 @@ Status OrcReader::_init_read_columns() {
_read_cols_lower_case.emplace_back(col_name);
// For hive engine, store the orc column name to schema column name map.
// This is for Hive 1.x orc file with internal column name _col0, _col1...
if (_is_hive1_orc) {
if (_is_hive1_orc_or_use_idx) {
_removed_acid_file_col_name_to_schema_col[orc_cols[pos]] = col_name;
}

_col_name_to_file_col_name[col_name] = read_col;
}
}
Expand Down Expand Up @@ -708,7 +712,7 @@ bool OrcReader::_init_search_argument(
if (iter == colname_to_value_range->end()) {
continue;
}
auto type_it = type_map.find(col_name);
auto type_it = type_map.find(_col_name_to_file_col_name[col_name]);
if (type_it == type_map.end()) {
continue;
}
Expand Down Expand Up @@ -913,7 +917,7 @@ Status OrcReader::_init_select_types(const orc::Type& type, int idx) {
std::string name;
// For hive engine, translate the column name in orc file to schema column name.
// This is for Hive 1.x which use internal column name _col0, _col1...
if (_is_hive1_orc) {
if (_is_hive1_orc_or_use_idx) {
name = _removed_acid_file_col_name_to_schema_col[type.getFieldName(i)];
} else {
name = get_field_name_lower_case(&type, i);
Expand Down
15 changes: 10 additions & 5 deletions be/src/vec/exec/format/orc/vorc_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,14 +139,15 @@ class OrcReader : public GenericReader {
const std::string& ctz, io::IOContext* io_ctx, bool enable_lazy_mat = true);

~OrcReader() override;

//If you want to read the file by index instead of column name, set hive_use_column_names to false.
Status init_reader(
const std::vector<std::string>* column_names,
std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range,
const VExprContextSPtrs& conjuncts, bool is_acid,
const TupleDescriptor* tuple_descriptor, const RowDescriptor* row_descriptor,
const VExprContextSPtrs* not_single_slot_filter_conjuncts,
const std::unordered_map<int, VExprContextSPtrs>* slot_id_to_filter_conjuncts);
const std::unordered_map<int, VExprContextSPtrs>* slot_id_to_filter_conjuncts,
const bool hive_use_column_names = true);

Status set_fill_columns(
const std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>&
Expand Down Expand Up @@ -570,9 +571,11 @@ class OrcReader : public GenericReader {
// This is used for Hive 1.x which use internal column name in Orc file.
// _col0, _col1...
std::unordered_map<std::string, std::string> _removed_acid_file_col_name_to_schema_col;
// Flag for hive engine. True if the external table engine is Hive1.x with orc col name
// as _col1, col2, ...
bool _is_hive1_orc = false;
// Flag for hive engine.
// 1. True if the external table engine is Hive1.x with orc col name as _col1, col2, ...
// 2. If true, use indexes instead of column names when reading orc tables.
bool _is_hive1_orc_or_use_idx = false;

std::unordered_map<std::string, std::string> _col_name_to_file_col_name;
std::unordered_map<std::string, const orc::Type*> _type_map;
std::vector<const orc::Type*> _col_orc_type;
Expand Down Expand Up @@ -621,6 +624,8 @@ class OrcReader : public GenericReader {
// resolve schema change
std::unordered_map<std::string, std::unique_ptr<converter::ColumnTypeConverter>> _converters;
//for iceberg table , when table column name != file column name
//TODO(CXY) : remove _table_col_to_file_col,because we hava _col_name_to_file_col_name,
// the two have the same effect.
std::unordered_map<std::string, std::string> _table_col_to_file_col;
//support iceberg position delete .
std::vector<int64_t>* _position_delete_ordered_rowids = nullptr;
Expand Down
93 changes: 72 additions & 21 deletions be/src/vec/exec/format/parquet/vparquet_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <gen_cpp/parquet_types.h>
#include <glog/logging.h>

#include <algorithm>
#include <functional>
#include <utility>

Expand Down Expand Up @@ -300,12 +301,14 @@ Status ParquetReader::init_reader(
const std::unordered_map<std::string, int>* colname_to_slot_id,
const VExprContextSPtrs* not_single_slot_filter_conjuncts,
const std::unordered_map<int, VExprContextSPtrs>* slot_id_to_filter_conjuncts,
bool filter_groups) {
bool filter_groups, const bool hive_use_column_names) {
_tuple_descriptor = tuple_descriptor;
_row_descriptor = row_descriptor;
_colname_to_slot_id = colname_to_slot_id;
_not_single_slot_filter_conjuncts = not_single_slot_filter_conjuncts;
_slot_id_to_filter_conjuncts = slot_id_to_filter_conjuncts;
_colname_to_value_range = colname_to_value_range;
_hive_use_column_names = hive_use_column_names;
if (_file_metadata == nullptr) {
return Status::InternalError("failed to init parquet reader, please open reader first");
}
Expand All @@ -320,28 +323,59 @@ Status ParquetReader::init_reader(
// e.g. table added a column after this parquet file was written.
_column_names = &all_column_names;
auto schema_desc = _file_metadata->schema();
std::set<std::string> required_columns(all_column_names.begin(), all_column_names.end());
// Currently only used in iceberg, the columns are dropped but added back
std::set<std::string> dropped_columns(missing_column_names.begin(), missing_column_names.end());
// Make the order of read columns the same as physical order in parquet file
for (int i = 0; i < schema_desc.size(); ++i) {
auto name = schema_desc.get_column(i)->name;
// If the column in parquet file is included in all_column_names and not in missing_column_names,
// add it to _map_column, which means the reader should read the data of this column.
// Here to check against missing_column_names is for the 'Add a column back to the table
// with the same column name' case. (drop column a then add column a).
// Shouldn't read this column data in this case.
if (required_columns.find(name) != required_columns.end() &&
dropped_columns.find(name) == dropped_columns.end()) {
required_columns.erase(name);
_read_columns.emplace_back(name);
if (_hive_use_column_names) {
std::set<std::string> required_columns(all_column_names.begin(), all_column_names.end());
// Currently only used in iceberg, the columns are dropped but added back
std::set<std::string> dropped_columns(missing_column_names.begin(),
missing_column_names.end());
// Make the order of read columns the same as physical order in parquet file
for (int i = 0; i < schema_desc.size(); ++i) {
auto name = schema_desc.get_column(i)->name;
// If the column in parquet file is included in all_column_names and not in missing_column_names,
// add it to _map_column, which means the reader should read the data of this column.
// Here to check against missing_column_names is for the 'Add a column back to the table
// with the same column name' case. (drop column a then add column a).
// Shouldn't read this column data in this case.
if (required_columns.find(name) != required_columns.end() &&
dropped_columns.find(name) == dropped_columns.end()) {
required_columns.erase(name);
_read_columns.emplace_back(name);
}
}
for (const std::string& name : required_columns) {
_missing_cols.emplace_back(name);
}
} else {
std::unordered_map<std::string, ColumnValueRangeType> new_colname_to_value_range;
const auto& table_column_idxs = _scan_params.column_idxs;
std::map<int, int> table_col_id_to_idx;
for (int i = 0; i < table_column_idxs.size(); i++) {
table_col_id_to_idx.insert({table_column_idxs[i], i});
}
}
for (const std::string& name : required_columns) {
_missing_cols.emplace_back(name);
}

_colname_to_value_range = colname_to_value_range;
for (auto [id, idx] : table_col_id_to_idx) {
if (id >= schema_desc.size()) {
_missing_cols.emplace_back(all_column_names[idx]);
} else {
auto& table_col = all_column_names[idx];
auto file_col = schema_desc.get_column(id)->name;
_read_columns.emplace_back(file_col);

if (table_col != file_col) {
_table_col_to_file_col[table_col] = file_col;
auto iter = _colname_to_value_range->find(table_col);
if (iter != _colname_to_value_range->end()) {
continue;
}
new_colname_to_value_range[file_col] = iter->second;
_colname_to_value_range->erase(iter->first);
}
}
}
for (auto it : new_colname_to_value_range) {
_colname_to_value_range->emplace(it.first, std::move(it.second));
}
}
// build column predicates for column lazy read
_lazy_read_ctx.conjuncts = conjuncts;
RETURN_IF_ERROR(_init_row_groups(filter_groups));
Expand Down Expand Up @@ -525,6 +559,16 @@ Status ParquetReader::get_next_block(Block* block, size_t* read_rows, bool* eof)
return Status::OK();
}

if (!_hive_use_column_names) {
for (auto i = 0; i < block->get_names().size(); i++) {
auto& col = block->get_by_position(i);
if (_table_col_to_file_col.contains(col.name)) {
col.name = _table_col_to_file_col[col.name];
}
}
block->initialize_index_by_name();
}

SCOPED_RAW_TIMER(&_statistics.column_read_time);
Status batch_st =
_current_group_reader->next_batch(block, _batch_size, read_rows, &_row_group_eof);
Expand All @@ -535,6 +579,13 @@ Status ParquetReader::get_next_block(Block* block, size_t* read_rows, bool* eof)
*eof = true;
return Status::OK();
}

if (!_hive_use_column_names) {
for (auto i = 0; i < block->columns(); i++) {
block->get_by_position(i).name = (*_column_names)[i];
}
block->initialize_index_by_name();
}
if (!batch_st.ok()) {
return Status::InternalError("Read parquet file {} failed, reason = {}", _scan_range.path,
batch_st.to_string());
Expand Down
3 changes: 2 additions & 1 deletion be/src/vec/exec/format/parquet/vparquet_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ class ParquetReader : public GenericReader {
const std::unordered_map<std::string, int>* colname_to_slot_id,
const VExprContextSPtrs* not_single_slot_filter_conjuncts,
const std::unordered_map<int, VExprContextSPtrs>* slot_id_to_filter_conjuncts,
bool filter_groups = true);
bool filter_groups = true, const bool hive_use_column_names = true);

Status get_next_block(Block* block, size_t* read_rows, bool* eof) override;

Expand Down Expand Up @@ -283,5 +283,6 @@ class ParquetReader : public GenericReader {
const std::unordered_map<std::string, int>* _colname_to_slot_id = nullptr;
const VExprContextSPtrs* _not_single_slot_filter_conjuncts = nullptr;
const std::unordered_map<int, VExprContextSPtrs>* _slot_id_to_filter_conjuncts = nullptr;
bool _hive_use_column_names = false;
};
} // namespace doris::vectorized
21 changes: 19 additions & 2 deletions be/src/vec/exec/scan/vfile_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -862,12 +862,21 @@ Status VFileScanner::_get_next_reader() {
RETURN_IF_ERROR(paimon_reader->init_row_filters(range));
_cur_reader = std::move(paimon_reader);
} else {
bool hive_parquet_use_column_names = true;

if (range.__isset.table_format_params &&
range.table_format_params.table_format_type == "hive" && _state != nullptr)
[[likely]] {
hive_parquet_use_column_names =
_state->query_options().hive_parquet_use_column_names;
}

std::vector<std::string> place_holder;
init_status = parquet_reader->init_reader(
_file_col_names, place_holder, _colname_to_value_range,
_push_down_conjuncts, _real_tuple_desc, _default_val_row_desc.get(),
_col_name_to_slot_id, &_not_single_slot_filter_conjuncts,
&_slot_id_to_filter_conjuncts);
&_slot_id_to_filter_conjuncts, true, hive_parquet_use_column_names);
_cur_reader = std::move(parquet_reader);
}
need_to_get_parsed_schema = true;
Expand Down Expand Up @@ -923,10 +932,18 @@ Status VFileScanner::_get_next_reader() {
RETURN_IF_ERROR(paimon_reader->init_row_filters(range));
_cur_reader = std::move(paimon_reader);
} else {
bool hive_orc_use_column_names = true;

if (range.__isset.table_format_params &&
range.table_format_params.table_format_type == "hive" && _state != nullptr)
[[likely]] {
hive_orc_use_column_names = _state->query_options().hive_orc_use_column_names;
}
init_status = orc_reader->init_reader(
&_file_col_names, _colname_to_value_range, _push_down_conjuncts, false,
_real_tuple_desc, _default_val_row_desc.get(),
&_not_single_slot_filter_conjuncts, &_slot_id_to_filter_conjuncts);
&_not_single_slot_filter_conjuncts, &_slot_id_to_filter_conjuncts,
hive_orc_use_column_names);
_cur_reader = std::move(orc_reader);
}
need_to_get_parsed_schema = true;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
use default;

create table simulation_hive1_orc(
`a` boolean,
`b` int,
`c` string
)stored as orc
LOCATION '/user/doris/preinstalled_data/orc_table/simulation_hive1_orc';
msck repair table simulation_hive1_orc;

create table test_hive_rename_column_parquet(
`new_a` boolean,
`new_b` int,
`c` string,
`new_d` int,
`f` string
)stored as parquet
LOCATION '/user/doris/preinstalled_data/parquet_table/test_hive_rename_column_parquet';
msck repair table test_hive_rename_column_parquet;

create table test_hive_rename_column_orc(
`new_a` boolean,
`new_b` int,
`c` string,
`new_d` int,
`f` string
)stored as orc
LOCATION '/user/doris/preinstalled_data/orc_table/test_hive_rename_column_orc';
msck repair table test_hive_rename_column_orc;
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,10 @@ private TScanRangeLocations splitToScanRange(
transactionalHiveDesc.setDeleteDeltas(deleteDeltaDescs);
tableFormatFileDesc.setTransactionalHiveParams(transactionalHiveDesc);
rangeDesc.setTableFormatParams(tableFormatFileDesc);
} else if (fileSplit instanceof HiveSplit) {
TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc();
tableFormatFileDesc.setTableFormatType(TableFormatType.HIVE.value());
rangeDesc.setTableFormatParams(tableFormatFileDesc);
}

setScanParams(rangeDesc, fileSplit);
Expand Down
20 changes: 20 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,10 @@ public class SessionVariable implements Serializable, Writable {

public static final String ENABLE_PUSHDOWN_MINMAX_ON_UNIQUE = "enable_pushdown_minmax_on_unique";

public static final String HIVE_PARQUET_USE_COLUMN_NAMES = "hive_parquet_use_column_names";

public static final String HIVE_ORC_USE_COLUMN_NAMES = "hive_orc_use_column_names";

public static final String KEEP_CARRIAGE_RETURN = "keep_carriage_return";

public static final String ENABLE_PUSHDOWN_STRING_MINMAX = "enable_pushdown_string_minmax";
Expand Down Expand Up @@ -1770,11 +1774,25 @@ public void setEnableLeftZigZag(boolean enableLeftZigZag) {
public int createTablePartitionMaxNum = 10000;


@VariableMgr.VarAttr(name = HIVE_PARQUET_USE_COLUMN_NAMES,
description = {"默认情况下按名称访问 Parquet 列。将此属性设置为“false”可按 Hive 表定义中的序号位置访问列。",
"Access Parquet columns by name by default. Set this property to `false` to access columns "
+ "by their ordinal position in the Hive table definition."})
public boolean hiveParquetUseColumnNames = true;


@VariableMgr.VarAttr(name = HIVE_ORC_USE_COLUMN_NAMES,
description = {"默认情况下按名称访问 Orc 列。将此属性设置为“false”可按 Hive 表定义中的序号位置访问列。",
"Access Parquet columns by name by default. Set this property to `false` to access columns "
+ "by their ordinal position in the Hive table definition."})
public boolean hiveOrcUseColumnNames = true;

@VariableMgr.VarAttr(name = KEEP_CARRIAGE_RETURN,
description = {"在同时处理\r\r\n作为CSV的行分隔符时,是否保留\r",
"When processing both \\n and \\r\\n as CSV line separators, should \\r be retained?"})
public boolean keepCarriageReturn = false;


@VariableMgr.VarAttr(name = FORCE_JNI_SCANNER,
description = {"强制使用jni方式读取外表", "Force the use of jni mode to read external table"})
private boolean forceJniScanner = false;
Expand Down Expand Up @@ -3435,6 +3453,8 @@ public TQueryOptions toThrift() {

tResult.setReadCsvEmptyLineAsNull(readCsvEmptyLineAsNull);
tResult.setSerdeDialect(getSerdeDialect());
tResult.setHiveOrcUseColumnNames(hiveOrcUseColumnNames);
tResult.setHiveParquetUseColumnNames(hiveParquetUseColumnNames);
tResult.setKeepCarriageReturn(keepCarriageReturn);
return tResult;
}
Expand Down
Loading

0 comments on commit 5d02c48

Please sign in to comment.