Skip to content

Commit

Permalink
[fix](multi-catalog) conversion of compatible numerical types (#23113)
Browse files Browse the repository at this point in the history
Hive support schema change, but doesn't rewrite the parquet file, so the physical type of parquet file may not equal the logical type of table schema.
  • Loading branch information
AshinGau authored Aug 18, 2023
1 parent 4f7760a commit 795006e
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 12 deletions.
12 changes: 6 additions & 6 deletions be/src/vec/exec/format/parquet/fix_length_dict_decoder.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,10 @@ class FixLengthDictDecoder final : public BaseDictDecoder {

TypeIndex logical_type = remove_nullable(data_type)->get_type_id();
switch (logical_type) {
#define DISPATCH(NUMERIC_TYPE, CPP_NUMERIC_TYPE, PHYSICAL_TYPE) \
case NUMERIC_TYPE: \
if constexpr (std::is_same_v<T, PHYSICAL_TYPE>) { \
return _decode_numeric<CPP_NUMERIC_TYPE, has_filter>(doris_column, select_vector); \
#define DISPATCH(NUMERIC_TYPE, CPP_NUMERIC_TYPE, PHYSICAL_TYPE) \
case NUMERIC_TYPE: \
if constexpr (!std::is_same_v<T, ParquetInt96>) { \
return _decode_numeric<CPP_NUMERIC_TYPE, T, has_filter>(doris_column, select_vector); \
}
FOR_LOGICAL_NUMERIC_TYPES(DISPATCH)
#undef DISPATCH
Expand Down Expand Up @@ -177,7 +177,7 @@ class FixLengthDictDecoder final : public BaseDictDecoder {
}

protected:
template <typename Numeric, bool has_filter>
template <typename Numeric, typename PhysicalType, bool has_filter>
Status _decode_numeric(MutableColumnPtr& doris_column, ColumnSelectVector& select_vector) {
auto& column_data = static_cast<ColumnVector<Numeric>&>(*doris_column).get_data();
size_t data_index = column_data.size();
Expand All @@ -189,7 +189,7 @@ class FixLengthDictDecoder final : public BaseDictDecoder {
case ColumnSelectVector::CONTENT: {
for (size_t i = 0; i < run_length; ++i) {
column_data[data_index++] =
static_cast<Numeric>(_dict_items[_indexes[dict_index++]]);
static_cast<PhysicalType>(_dict_items[_indexes[dict_index++]]);
}
break;
}
Expand Down
24 changes: 19 additions & 5 deletions be/src/vec/exec/format/parquet/fix_length_plain_decoder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,23 @@ Status FixLengthPlainDecoder::_decode_values(MutableColumnPtr& doris_column, Dat
}
TypeIndex logical_type = remove_nullable(data_type)->get_type_id();
switch (logical_type) {
#define DISPATCH(NUMERIC_TYPE, CPP_NUMERIC_TYPE, PHYSICAL_TYPE) \
case NUMERIC_TYPE: \
return _decode_numeric<CPP_NUMERIC_TYPE, has_filter>(doris_column, select_vector);
#define DISPATCH(NUMERIC_TYPE, CPP_NUMERIC_TYPE, PHYSICAL_TYPE) \
case NUMERIC_TYPE: \
if (_physical_type == tparquet::Type::INT32) { \
return _decode_numeric<CPP_NUMERIC_TYPE, Int32, has_filter>(doris_column, \
select_vector); \
} else if (_physical_type == tparquet::Type::INT64) { \
return _decode_numeric<CPP_NUMERIC_TYPE, Int64, has_filter>(doris_column, \
select_vector); \
} else if (_physical_type == tparquet::Type::FLOAT) { \
return _decode_numeric<CPP_NUMERIC_TYPE, Float32, has_filter>(doris_column, \
select_vector); \
} else if (_physical_type == tparquet::Type::DOUBLE) { \
return _decode_numeric<CPP_NUMERIC_TYPE, Float64, has_filter>(doris_column, \
select_vector); \
} else { \
break; \
}
FOR_LOGICAL_NUMERIC_TYPES(DISPATCH)
#undef DISPATCH
case TypeIndex::Date:
Expand Down Expand Up @@ -207,7 +221,7 @@ Status FixLengthPlainDecoder::_decode_string(MutableColumnPtr& doris_column,
}
return Status::OK();
}
template <typename Numeric, bool has_filter>
template <typename Numeric, typename PhysicalType, bool has_filter>
Status FixLengthPlainDecoder::_decode_numeric(MutableColumnPtr& doris_column,
ColumnSelectVector& select_vector) {
auto& column_data = static_cast<ColumnVector<Numeric>&>(*doris_column).get_data();
Expand All @@ -219,7 +233,7 @@ Status FixLengthPlainDecoder::_decode_numeric(MutableColumnPtr& doris_column,
case ColumnSelectVector::CONTENT: {
for (size_t i = 0; i < run_length; ++i) {
char* buf_start = _data->data + _offset;
column_data[data_index++] = *(Numeric*)buf_start;
column_data[data_index++] = *(PhysicalType*)buf_start;
_offset += _type_length;
}
break;
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/exec/format/parquet/fix_length_plain_decoder.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class FixLengthPlainDecoder final : public Decoder {
Status skip_values(size_t num_values) override;

protected:
template <typename Numeric, bool has_filter>
template <typename Numeric, typename PhysicalType, bool has_filter>
Status _decode_numeric(MutableColumnPtr& doris_column, ColumnSelectVector& select_vector);

template <typename CppType, typename ColumnType, bool has_filter>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !schema_change --
1 1 1.0 1.0 1.0099999904632568 1.10101
2 2 2.0 2.0 2.0199999809265137 2.20202
3 3 3.0 3.0 3.0299999713897705 3.30303
123 6334 7898763.0 1.2837483628455E13 0.010101010091602802 0.09238498728784825

Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite("test_hive_schema_change", "p2,external,hive,external_remote,external_remote_hive") {
String enabled = context.config.otherConfigs.get("enableExternalHiveTest")
if (enabled != null && enabled.equalsIgnoreCase("true")) {
String extHiveHmsHost = context.config.otherConfigs.get("extHiveHmsHost")
String extHiveHmsPort = context.config.otherConfigs.get("extHiveHmsPort")
String catalog_name = "test_hive_schema_change"
sql """drop catalog if exists ${catalog_name};"""
sql """
create catalog if not exists ${catalog_name} properties (
'type'='hms',
'hadoop.username' = 'hadoop',
'hive.metastore.uris' = 'thrift://${extHiveHmsHost}:${extHiveHmsPort}'
);
"""
sql """ switch ${catalog_name} """
sql """ use `default` """
qt_schema_change """ select * from schema_change order by tinyint_col """
sql """ drop catalog ${catalog_name} """
}
}

0 comments on commit 795006e

Please sign in to comment.