Skip to content

Commit

Permalink
[fix](multi-catalog) conversion of compatible numerical types (apache…
Browse files Browse the repository at this point in the history
…#23113)

Hive support schema change, but doesn't rewrite the parquet file, so the physical type of parquet file may not equal the logical type of table schema.
  • Loading branch information
AshinGau authored and airborne12 committed Aug 21, 2023
1 parent 177d7a8 commit aa65f0c
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 12 deletions.
12 changes: 6 additions & 6 deletions be/src/vec/exec/format/parquet/fix_length_dict_decoder.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,10 @@ class FixLengthDictDecoder final : public BaseDictDecoder {

TypeIndex logical_type = remove_nullable(data_type)->get_type_id();
switch (logical_type) {
#define DISPATCH(NUMERIC_TYPE, CPP_NUMERIC_TYPE, PHYSICAL_TYPE) \
case NUMERIC_TYPE: \
if constexpr (std::is_same_v<T, PHYSICAL_TYPE>) { \
return _decode_numeric<CPP_NUMERIC_TYPE, has_filter>(doris_column, select_vector); \
#define DISPATCH(NUMERIC_TYPE, CPP_NUMERIC_TYPE, PHYSICAL_TYPE) \
case NUMERIC_TYPE: \
if constexpr (!std::is_same_v<T, ParquetInt96>) { \
return _decode_numeric<CPP_NUMERIC_TYPE, T, has_filter>(doris_column, select_vector); \
}
FOR_LOGICAL_NUMERIC_TYPES(DISPATCH)
#undef DISPATCH
Expand Down Expand Up @@ -177,7 +177,7 @@ class FixLengthDictDecoder final : public BaseDictDecoder {
}

protected:
template <typename Numeric, bool has_filter>
template <typename Numeric, typename PhysicalType, bool has_filter>
Status _decode_numeric(MutableColumnPtr& doris_column, ColumnSelectVector& select_vector) {
auto& column_data = static_cast<ColumnVector<Numeric>&>(*doris_column).get_data();
size_t data_index = column_data.size();
Expand All @@ -189,7 +189,7 @@ class FixLengthDictDecoder final : public BaseDictDecoder {
case ColumnSelectVector::CONTENT: {
for (size_t i = 0; i < run_length; ++i) {
column_data[data_index++] =
static_cast<Numeric>(_dict_items[_indexes[dict_index++]]);
static_cast<PhysicalType>(_dict_items[_indexes[dict_index++]]);
}
break;
}
Expand Down
24 changes: 19 additions & 5 deletions be/src/vec/exec/format/parquet/fix_length_plain_decoder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,23 @@ Status FixLengthPlainDecoder::_decode_values(MutableColumnPtr& doris_column, Dat
}
TypeIndex logical_type = remove_nullable(data_type)->get_type_id();
switch (logical_type) {
#define DISPATCH(NUMERIC_TYPE, CPP_NUMERIC_TYPE, PHYSICAL_TYPE) \
case NUMERIC_TYPE: \
return _decode_numeric<CPP_NUMERIC_TYPE, has_filter>(doris_column, select_vector);
#define DISPATCH(NUMERIC_TYPE, CPP_NUMERIC_TYPE, PHYSICAL_TYPE) \
case NUMERIC_TYPE: \
if (_physical_type == tparquet::Type::INT32) { \
return _decode_numeric<CPP_NUMERIC_TYPE, Int32, has_filter>(doris_column, \
select_vector); \
} else if (_physical_type == tparquet::Type::INT64) { \
return _decode_numeric<CPP_NUMERIC_TYPE, Int64, has_filter>(doris_column, \
select_vector); \
} else if (_physical_type == tparquet::Type::FLOAT) { \
return _decode_numeric<CPP_NUMERIC_TYPE, Float32, has_filter>(doris_column, \
select_vector); \
} else if (_physical_type == tparquet::Type::DOUBLE) { \
return _decode_numeric<CPP_NUMERIC_TYPE, Float64, has_filter>(doris_column, \
select_vector); \
} else { \
break; \
}
FOR_LOGICAL_NUMERIC_TYPES(DISPATCH)
#undef DISPATCH
case TypeIndex::Date:
Expand Down Expand Up @@ -207,7 +221,7 @@ Status FixLengthPlainDecoder::_decode_string(MutableColumnPtr& doris_column,
}
return Status::OK();
}
template <typename Numeric, bool has_filter>
template <typename Numeric, typename PhysicalType, bool has_filter>
Status FixLengthPlainDecoder::_decode_numeric(MutableColumnPtr& doris_column,
ColumnSelectVector& select_vector) {
auto& column_data = static_cast<ColumnVector<Numeric>&>(*doris_column).get_data();
Expand All @@ -219,7 +233,7 @@ Status FixLengthPlainDecoder::_decode_numeric(MutableColumnPtr& doris_column,
case ColumnSelectVector::CONTENT: {
for (size_t i = 0; i < run_length; ++i) {
char* buf_start = _data->data + _offset;
column_data[data_index++] = *(Numeric*)buf_start;
column_data[data_index++] = *(PhysicalType*)buf_start;
_offset += _type_length;
}
break;
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/exec/format/parquet/fix_length_plain_decoder.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class FixLengthPlainDecoder final : public Decoder {
Status skip_values(size_t num_values) override;

protected:
template <typename Numeric, bool has_filter>
template <typename Numeric, typename PhysicalType, bool has_filter>
Status _decode_numeric(MutableColumnPtr& doris_column, ColumnSelectVector& select_vector);

template <typename CppType, typename ColumnType, bool has_filter>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !schema_change --
1 1 1.0 1.0 1.0099999904632568 1.10101
2 2 2.0 2.0 2.0199999809265137 2.20202
3 3 3.0 3.0 3.0299999713897705 3.30303
123 6334 7898763.0 1.2837483628455E13 0.010101010091602802 0.09238498728784825

Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite("test_hive_schema_change", "p2,external,hive,external_remote,external_remote_hive") {
String enabled = context.config.otherConfigs.get("enableExternalHiveTest")
if (enabled != null && enabled.equalsIgnoreCase("true")) {
String extHiveHmsHost = context.config.otherConfigs.get("extHiveHmsHost")
String extHiveHmsPort = context.config.otherConfigs.get("extHiveHmsPort")
String catalog_name = "test_hive_schema_change"
sql """drop catalog if exists ${catalog_name};"""
sql """
create catalog if not exists ${catalog_name} properties (
'type'='hms',
'hadoop.username' = 'hadoop',
'hive.metastore.uris' = 'thrift://${extHiveHmsHost}:${extHiveHmsPort}'
);
"""
sql """ switch ${catalog_name} """
sql """ use `default` """
qt_schema_change """ select * from schema_change order by tinyint_col """
sql """ drop catalog ${catalog_name} """
}
}

0 comments on commit aa65f0c

Please sign in to comment.